diff --git a/cereal/log.capnp b/cereal/log.capnp index 931b4b2f1b..c33fc763de 100644 --- a/cereal/log.capnp +++ b/cereal/log.capnp @@ -1180,7 +1180,8 @@ struct ModelDataV2 { laneChangeProb @15 :Float32; desireLog @16 : Text; modelTurnSpeed @17 :Float32; - + laneChangeAvailableLeft @18 :Bool; + laneChangeAvailableRight @19 :Bool; # deprecated brakeDisengageProbDEPRECATED @2 :Float32; diff --git a/common/params_keys.h b/common/params_keys.h index 2a274bc3d5..d3ba8e4640 100644 --- a/common/params_keys.h +++ b/common/params_keys.h @@ -141,7 +141,7 @@ inline static std::unordered_map keys = { {"GMapKey", {PERSISTENT, STRING}}, {"SearchInput", {PERSISTENT, INT}}, - {"CarSelected3", {PERSISTENT, STRING}}, + {"CarSelected3", {PERSISTENT, STRING, "MOCK"}}, {"SupportedCars", {PERSISTENT, STRING}}, {"SupportedCars_gm", {PERSISTENT, STRING}}, {"ShowDebugUI", {PERSISTENT, INT, "0"}}, @@ -222,6 +222,7 @@ inline static std::unordered_map keys = { {"SteerActuatorDelay", {PERSISTENT, INT, "0"}}, {"LatSmoothSec", {PERSISTENT, INT, "13"}}, + {"LatSuspendAngleDeg", {PERSISTENT, INT, "300"}}, {"CruiseOnDist", {PERSISTENT, INT, "400"}}, {"CruiseMaxVals0", {PERSISTENT, INT, "160"}}, diff --git a/opendbc_repo/opendbc/can/packer.py b/opendbc_repo/opendbc/can/packer.py index 882dc0b669..0736129384 100644 --- a/opendbc_repo/opendbc/can/packer.py +++ b/opendbc_repo/opendbc/can/packer.py @@ -8,7 +8,7 @@ def __init__(self, dbc_name: str): self.dbc = DBC(dbc_name) self.counters: dict[int, int] = {} - def pack(self, address: int, values: dict[str, float]) -> bytearray: + def pack(self, address: int, values: dict[str, float], rx_counter: int | None = None) -> bytearray: msg = self.dbc.addr_to_msg.get(address) if msg is None: return bytearray() @@ -27,8 +27,8 @@ def pack(self, address: int, values: dict[str, float]) -> bytearray: counter_set = True sig_counter = next((s for s in msg.sigs.values() if s.type == SignalType.COUNTER or s.name == "COUNTER"), None) if sig_counter and not counter_set: - if address not in self.counters: - self.counters[address] = 0 + if address not in self.counters: + self.counters[address] = 0 if rx_counter is None else (int(rx_counter) + 1) % (1 << sig_counter.size) set_value(dat, sig_counter, self.counters[address]) self.counters[address] = (self.counters[address] + 1) % (1 << sig_counter.size) sig_checksum = next((s for s in msg.sigs.values() if s.type > SignalType.COUNTER), None) @@ -37,7 +37,7 @@ def pack(self, address: int, values: dict[str, float]) -> bytearray: set_value(dat, sig_checksum, checksum) return dat - def make_can_msg(self, name_or_addr, bus: int, values: dict[str, float]): + def make_can_msg(self, name_or_addr, bus: int, values: dict[str, float], rx_counter: int | None = None): if isinstance(name_or_addr, int): addr = name_or_addr else: @@ -45,7 +45,7 @@ def make_can_msg(self, name_or_addr, bus: int, values: dict[str, float]): if msg is None: return 0, b'', bus addr = msg.address - dat = self.pack(addr, values) + dat = self.pack(addr, values, rx_counter = rx_counter) if len(dat) == 0: return 0, b'', bus return addr, bytes(dat), bus diff --git a/opendbc_repo/opendbc/car/hyundai/carcontroller.py b/opendbc_repo/opendbc/car/hyundai/carcontroller.py index 777feb65fa..02b0a59215 100644 --- a/opendbc_repo/opendbc/car/hyundai/carcontroller.py +++ b/opendbc_repo/opendbc/car/hyundai/carcontroller.py @@ -319,13 +319,12 @@ def update(self, CC, CS, now_nanos): if self.CP.flags & HyundaiFlags.CANFD: hda2 = self.CP.flags & HyundaiFlags.CANFD_HDA2 hda2_long = hda2 and self.CP.openpilotLongitudinalControl - # steering control if camera_scc: can_sends.extend(hyundaicanfd.create_steering_messages_camera_scc(self.frame, self.packer, self.CP, self.CAN, CC, apply_steer_req, apply_torque, CS, apply_angle, self.lkas_max_torque, angle_control)) else: can_sends.extend(hyundaicanfd.create_steering_messages(self.packer, self.CP, self.CAN, CC.enabled, apply_steer_req, apply_torque, apply_angle, self.lkas_max_torque, angle_control)) - + # prevent LFA from activating on HDA2 by sending "no lane lines detected" to ADAS ECU if self.frame % 5 == 0 and hda2 and not camera_scc: can_sends.extend(hyundaicanfd.create_suppress_lfa(self.packer, self.CAN, CS)) @@ -352,8 +351,10 @@ def update(self, CC, CS, now_nanos): can_sends.extend(hyundaicanfd.create_fca_warning_light(self.CP, self.packer, self.CAN, self.frame)) if self.frame % 2 == 0: if self.CP.flags & HyundaiFlags.CAMERA_SCC.value: - can_sends.append(hyundaicanfd.create_acc_control_scc2(self.packer, self.CAN, CC.enabled, self.accel_last, accel, stopping, CC.cruiseControl.override, - set_speed_in_units, hud_control, self.hyundai_jerk, CS)) + msg = hyundaicanfd.create_acc_control_scc2(self.packer, self.CAN, CC.enabled, self.accel_last, accel, stopping, CC.cruiseControl.override, + set_speed_in_units, hud_control, self.hyundai_jerk, CS) + if msg is not None: + can_sends.append(msg) can_sends.extend(hyundaicanfd.create_tcs_messages(self.packer, self.CAN, CS)) # for sorento SCC radar... else: can_sends.append(hyundaicanfd.create_acc_control(self.packer, self.CAN, CC.enabled, self.accel_last, accel, stopping, CC.cruiseControl.override, @@ -366,16 +367,16 @@ def update(self, CC, CS, now_nanos): can_sends.extend(hyundaicanfd.forward_button_message(self.packer, self.CAN, self.frame, CS, send_button, self.MainMode_ACC_trigger, self.LFA_trigger)) else: can_sends.extend(self.create_button_messages(CC, CS, use_clu11=False)) - else: - can_sends.append(hyundaican.create_lkas11(self.packer, self.frame, self.CP, apply_torque, apply_steer_req, - torque_fault, CS.lkas11, sys_warning, sys_state, CC.enabled, - hud_control.leftLaneVisible, hud_control.rightLaneVisible, - left_lane_warning, right_lane_warning, self.is_ldws_car)) + if CS.lkas11 is not None: + can_sends.append(hyundaican.create_lkas11(self.packer, self.frame, self.CP, apply_torque, apply_steer_req, + torque_fault, CS.lkas11, sys_warning, sys_state, CC.enabled, + hud_control.leftLaneVisible, hud_control.rightLaneVisible, + left_lane_warning, right_lane_warning, self.is_ldws_car)) if not self.CP.openpilotLongitudinalControl: can_sends.extend(self.create_button_messages(CC, CS, use_clu11=True)) - if self.CP.carFingerprint in CAN_GEARS["send_mdps12"]: # send mdps12 to LKAS to prevent LKAS error + if self.CP.carFingerprint in CAN_GEARS["send_mdps12"] and CS.mdps12 is not None: # send mdps12 to LKAS to prevent LKAS error can_sends.append(hyundaican.create_mdps12(self.packer, self.frame, CS.mdps12)) casper_opt = self.CP.carFingerprint in (CAR.HYUNDAI_CASPER_EV) @@ -464,7 +465,7 @@ def create_button_messages(self, CC: structs.CarControl, CS: CarState, use_clu11 if (self.frame - self.last_button_frame) * DT_CTRL > 0.1: print("cruiseControl.cancel222222") if self.CP.flags & HyundaiFlags.CANFD_ALT_BUTTONS: - #can_sends.append(hyundaicanfd.create_acc_cancel(self.packer, self.CP, self.CAN, CS.cruise_info)) + #can_sends.append(hyundaicanfd.create_acc_cancel(self.packer, self.CP, self.CAN, CS.scc_control)) if self.cruise_buttons_msg_values is not None: can_sends.append(hyundaicanfd.alt_cruise_buttons(self.packer, self.CP, self.CAN, Buttons.CANCEL, self.cruise_buttons_msg_values, self.cruise_buttons_msg_cnt)) diff --git a/opendbc_repo/opendbc/car/hyundai/carstate.py b/opendbc_repo/opendbc/car/hyundai/carstate.py index 7785201e53..fb121f2c89 100644 --- a/opendbc_repo/opendbc/car/hyundai/carstate.py +++ b/opendbc_repo/opendbc/car/hyundai/carstate.py @@ -76,24 +76,32 @@ def __init__(self, CP): self.is_metric = False self.buttons_counter = 0 - self.cruise_info = {} - self.lfa_info = {} - self.lfa_alt_info = {} - self.lfahda_cluster_info = None - self.adrv_info_161 = None - self.adrv_info_200 = None - self.adrv_info_1ea = None - self.adrv_info_160 = None - self.adrv_info_162 = None - self.hda_info_4a3 = None - self.new_msg_4b4 = None - self.tcs_info_373 = None - self.mdps_info = {} - self.steer_touch_info = {} - + # for generic CAN parsing + self.fca11 = None + self.scc11 = None + self.scc12 = None + self.scc13 = None + self.scc14 = None + self.lkas11 = None + self.clu11 = None + + # for CANFD parsing + self.scc_control = None + self.lfa = None + self.lfa_alt = None + self.lfahda_cluster = None + self.adrv_0x161 = None + self.adrv_0x200 = None + self.adrv_0x1ea = None + self.adrv_0x160 = None + self.ccnc_0x162 = None + self.hda_info_4a3 = None + self.tcs = None + self.mdps = None + self.steer_touch_2af = None self.cruise_buttons_msg = None - self.msg_0x362 = None - self.msg_0x2a4 = None + self.cam_0x362 = None + self.cam_0x2a4 = None # On some cars, CLU15->CF_Clu_VehicleSpeed can oscillate faster than the dash updates. Sample at 5 Hz self.cluster_speed = 0 @@ -128,67 +136,115 @@ def __init__(self, CP): if self.CP.openpilotLongitudinalControl and not (self.CP.flags & HyundaiFlags.CANFD_CAMERA_SCC): ecu_disabled = True - if ecu_disabled: - self.SCC11 = self.SCC12 = self.SCC13 = self.SCC14 = self.FCA11 = False - else: - bus_cruise = 2 if self.CP.flags & HyundaiFlags.CAMERA_SCC else 0 - self.SCC11 = True if 1056 in fingerprints[bus_cruise] else False - self.SCC12 = True if 1057 in fingerprints[bus_cruise] else False - self.SCC13 = True if 1290 in fingerprints[bus_cruise] else False - self.SCC14 = True if 905 in fingerprints[bus_cruise] else False - self.FCA11 = False - self.FCA11_bus = Bus.cam - + self.HAS_LFA_BUTTON = True if 913 in fingerprints[0] else False self.CRUISE_BUTTON_ALT = True if 1007 in fingerprints[0] else False cam_bus = CanBus(CP).CAM pt_bus = CanBus(CP).ECAN alt_bus = CanBus(CP).ACAN - self.CCNC_0x161 = True if 0x161 in fingerprints[cam_bus] else False - self.CCNC_0x162 = True if 0x162 in fingerprints[cam_bus] else False - self.ADRV_0x200 = True if 0x200 in fingerprints[cam_bus] else False - self.ADRV_0x1ea = True if 0x1ea in fingerprints[cam_bus] else False - self.ADRV_0x160 = True if 0x160 in fingerprints[cam_bus] else False - self.LFAHDA_CLUSTER = True if 480 in fingerprints[cam_bus] else False - self.HDA_INFO_4A3 = True if 0x4a3 in fingerprints[pt_bus] else False - self.NEW_MSG_4B4 = True if 0x4b4 in fingerprints[pt_bus] else False self.GEAR = True if 69 in fingerprints[pt_bus] else False self.GEAR_ALT = True if 64 in fingerprints[pt_bus] else False - self.CAM_0x362 = True if 0x362 in fingerprints[alt_bus] else False - self.CAM_0x2a4 = True if 0x2a4 in fingerprints[alt_bus] else False - self.STEER_TOUCH_2AF = True if 0x2af in fingerprints[pt_bus] else False self.TPMS = True if 0x3a0 in fingerprints[pt_bus] else False self.LOCAL_TIME = True if 1264 in fingerprints[pt_bus] else False self.cp_bsm = None self.time_zone = "UTC" + self.cp = None + self.cp_cam = None + self.cp_alt = None self.controls_ready_count = 0 - def update(self, can_parsers) -> structs.CarState: - + def monitor_fingerprint(self, can_parsers, canfd): if self.controls_ready_count <= 200: if Params().get_bool("ControlsReady"): self.controls_ready_count += 1 + self.cp = can_parsers[Bus.pt] + self.cp_cam = can_parsers[Bus.cam] + self.cp_alt = can_parsers[Bus.alt] if Bus.alt in can_parsers else None + + def add_if_seen(parser, name): + msg = parser.dbc.name_to_msg.get(name) + if not msg: + print(f"{name} not in DBC") + return + if msg.address not in parser.seen_addresses: + return + if msg.address in parser.addresses: + return + parser._add_message(name) # ← 이름으로 등록 + + def add_and_cache(parser, name: str, attr: str): + add_if_seen(parser, name) + if name in parser.vl: # 등록 성공했을 때만 + setattr(self, attr, parser.vl[name]) + return True + return False + + if self.controls_ready_count == 50: + self.cp.controls_ready = self.cp_cam.controls_ready = True + if self.cp_alt is not None: + self.cp_alt.controls_ready = True + elif self.controls_ready_count == 100: + self.cp.enable_capture = self.cp_cam.enable_capture = False + if self.cp_alt is not None: + self.cp_alt.enable_capture = False + elif self.controls_ready_count == 101: + print("cp_cam.seen_addresses =", self.cp_cam.seen_addresses) + elif self.controls_ready_count == 102: + print("cp.seen_addresses =", self.cp.seen_addresses) + elif self.controls_ready_count == 103: + if self.cp_alt is not None: + print("cp_alt.seen_addresses =", self.cp_alt.seen_addresses) + else: + print("cp_alt.seen_addresses = None") + if not canfd: + if self.controls_ready_count == 104: + if not add_and_cache(self.cp_cam, "FCA11", "fca11"): + add_and_cache(self.cp, "FCA11", "fca11") + add_and_cache(self.cp_cam, "LKAS11", "lkas11") + add_and_cache(self.cp, "CLU11", "clu11") + elif self.controls_ready_count == 105: + cp_cruise = self.cp_cam if self.CP.flags & HyundaiFlags.CAMERA_SCC else self.cp + add_and_cache(cp_cruise, "SCC11", "scc11") + add_and_cache(cp_cruise, "SCC12", "scc12") + add_and_cache(cp_cruise, "SCC13", "scc13") + add_and_cache(cp_cruise, "SCC14", "scc14") + else: # canfd + if self.controls_ready_count == 120: + cp_cruise = self.cp_cam if self.CP.flags & HyundaiFlags.CANFD_CAMERA_SCC else self.cp + add_and_cache(cp_cruise, "SCC_CONTROL", "scc_control") + elif self.controls_ready_count == 121: + add_and_cache(self.cp_cam, "LFA", "lfa") + add_and_cache(self.cp_cam, "LFA_ALT", "lfa_alt") + add_and_cache(self.cp_cam, "LFAHDA_CLUSTER", "lfahda_cluster") + elif self.controls_ready_count == 122: + add_and_cache(self.cp_cam, "ADRV_0x161", "adrv_0x161") + add_and_cache(self.cp_cam, "ADRV_0x200", "adrv_0x200") + add_and_cache(self.cp_cam, "ADRV_0x1ea", "adrv_0x1ea") + add_and_cache(self.cp_cam, "ADRV_0x160", "adrv_0x160") + add_and_cache(self.cp_cam, "CCNC_0x162", "ccnc_0x162") + elif self.controls_ready_count == 123: + add_and_cache(self.cp, "HDA_INFO_4A3", "hda_info_4a3") + add_and_cache(self.cp, "TCS", "tcs") + add_and_cache(self.cp, "MDPS", "mdps") + add_and_cache(self.cp, "STEER_TOUCH_2AF", "steer_touch_2af") + elif self.controls_ready_count == 124: + add_and_cache(self.cp, self.cruise_btns_msg_canfd, "cruise_buttons_msg") + if not add_and_cache(self.cp_cam, "CAM_0x362", "cam_0x362") and self.cp_alt is not None: + add_and_cache(self.cp_alt, "CAM_0x362", "cam_0x362") + if not add_and_cache(self.cp_alt, "CAM_0x2a4", "cam_0x2a4") and self.cp_cam is not None: + add_and_cache(self.cp_cam, "CAM_0x2a4", "cam_0x2a4") + + + + + def update(self, can_parsers) -> structs.CarState: + self.monitor_fingerprint(can_parsers, self.CP.flags & HyundaiFlags.CANFD) cp = can_parsers[Bus.pt] cp_cam = can_parsers[Bus.cam] cp_alt = can_parsers[Bus.alt] if Bus.alt in can_parsers else None - if self.controls_ready_count == 50: - cp.controls_ready = cp_cam.controls_ready = True - if cp_alt is not None: - cp_alt.controls_ready = True - elif self.controls_ready_count == 100: - print("cp_cam.seen_addresses =", cp_cam.seen_addresses) - print("cp.seen_addresses =", cp.seen_addresses) - if 909 in cp_cam.seen_addresses: - self.FCA11 = True - self.FCA11_bus = Bus.cam - elif 909 in cp.seen_addresses: - self.FCA11 = True - self.FCA11_bus = Bus.pt - if cp_alt is not None: - print("cp_alt.seen_addresses =", cp_alt.seen_addresses) if self.CP.flags & HyundaiFlags.CANFD: return self.update_canfd(can_parsers) @@ -325,9 +381,6 @@ def update(self, can_parsers) -> structs.CarState: ret.leftBlindspot = cp.vl["LCA11"]["CF_Lca_IndLeft"] != 0 ret.rightBlindspot = cp.vl["LCA11"]["CF_Lca_IndRight"] != 0 - # save the entire LKAS11 and CLU11 - self.lkas11 = copy.copy(cp_cam.vl["LKAS11"]) - self.clu11 = copy.copy(cp.vl["CLU11"]) self.steer_state = cp.vl["MDPS12"]["CF_Mdps_ToiActive"] # 0 NOT ACTIVE, 1 ACTIVE prev_cruise_buttons = self.cruise_buttons[-1] #self.cruise_buttons.extend(cp.vl_all["CLU11"]["CF_Clu_CruiseSwState"]) @@ -364,11 +417,6 @@ def update(self, can_parsers) -> structs.CarState: ret.tpms.rl = tpms_unit * cp.vl["TPMS11"]["PRESSURE_RL"] ret.tpms.rr = tpms_unit * cp.vl["TPMS11"]["PRESSURE_RR"] - self.scc11 = cp_cruise.vl["SCC11"] if self.SCC11 else None - self.scc12 = cp_cruise.vl["SCC12"] if self.SCC12 else None - self.scc13 = cp_cruise.vl["SCC13"] if self.SCC13 else None - self.scc14 = cp_cruise.vl["SCC14"] if self.SCC14 else None - self.fca11 = can_parsers[self.FCA11_bus].vl["FCA11"] if self.FCA11 else None cluSpeed = cp.vl["CLU11"]["CF_Clu_Vanz"] decimal = cp.vl["CLU11"]["CF_Clu_VanzDecimal"] if 0. < decimal < 0.5: @@ -466,10 +514,6 @@ def update_canfd(self, can_parsers) -> structs.CarState: ret.steerFaultTemporary = cp.vl["MDPS"]["LKA_FAULT"] != 0 or cp.vl["MDPS"]["LFA2_FAULT"] != 0 #ret.steerFaultTemporary = False - self.mdps_info = copy.copy(cp.vl["MDPS"]) - if self.STEER_TOUCH_2AF: - self.steer_touch_info = cp.vl["STEER_TOUCH_2AF"] - blinkers_info = cp.vl["BLINKERS"] left_blinker_lamp = blinkers_info["LEFT_LAMP"] or blinkers_info["LEFT_LAMP_ALT"] right_blinker_lamp = blinkers_info["RIGHT_LAMP"] or blinkers_info["RIGHT_LAMP_ALT"] @@ -512,40 +556,27 @@ def update_canfd(self, can_parsers) -> structs.CarState: ret.pcmCruiseGap = int(np.clip(cp_cruise_info.vl["SCC_CONTROL"]["DISTANCE_SETTING"], 1, 4)) ret.cruiseState.standstill = cp_cruise_info.vl["SCC_CONTROL"]["InfoDisplay"] >= 4 ret.cruiseState.speed = cp_cruise_info.vl["SCC_CONTROL"]["VSetDis"] * speed_factor - self.cruise_info = copy.copy(cp_cruise_info.vl["SCC_CONTROL"]) ret.brakeHoldActive = cp.vl["ESP_STATUS"]["AUTO_HOLD"] == 1 and cp_cruise_info.vl["SCC_CONTROL"]["ACCMode"] not in (1, 2) speed_limit_cam = False if self.CP.flags & HyundaiFlags.CAMERA_SCC.value: - self.cruise_info = copy.copy(cp_cam.vl["SCC_CONTROL"]) - self.lfa_info = copy.copy(cp_cam.vl["LFA"]) - if self.CP.flags & HyundaiFlags.ANGLE_CONTROL.value: - self.lfa_alt_info = copy.copy(cp_cam.vl["LFA_ALT"]) - - if self.LFAHDA_CLUSTER: - self.lfahda_cluster_info = cp_cam.vl["LFAHDA_CLUSTER"] - corner = False - self.adrv_info_161 = cp_cam.vl["ADRV_0x161"] if self.CCNC_0x161 else None - self.adrv_info_162 = cp_cam.vl["CCNC_0x162"] if self.CCNC_0x162 else None - if self.adrv_info_161 is not None: - ret.leftLongDist = self.lf_distance = self.adrv_info_162["LF_DETECT_DISTANCE"] - ret.rightLongDist = self.rf_distance = self.adrv_info_162["RF_DETECT_DISTANCE"] - self.lr_distance = self.adrv_info_162["LR_DETECT_DISTANCE"] - self.rr_distance = self.adrv_info_162["RR_DETECT_DISTANCE"] - ret.leftLatDist = self.adrv_info_162["LF_DETECT_LATERAL"] - ret.rightLatDist = self.adrv_info_162["RF_DETECT_LATERAL"] + if self.ccnc_0x162 is not None: + ret.leftLongDist = self.lf_distance = self.ccnc_0x162["LF_DETECT_DISTANCE"] + ret.rightLongDist = self.rf_distance = self.ccnc_0x162["RF_DETECT_DISTANCE"] + self.lr_distance = self.ccnc_0x162["LR_DETECT_DISTANCE"] + self.rr_distance = self.ccnc_0x162["RR_DETECT_DISTANCE"] + ret.leftLatDist = self.ccnc_0x162["LF_DETECT_LATERAL"] + ret.rightLatDist = self.ccnc_0x162["RF_DETECT_LATERAL"] corner = True - self.adrv_info_200 = cp_cam.vl["ADRV_0x200"] if self.ADRV_0x200 else None - self.adrv_info_1ea = cp_cam.vl["ADRV_0x1ea"] if self.ADRV_0x1ea else None - if self.adrv_info_1ea is not None: + if self.adrv_0x1ea is not None: if not corner: - ret.leftLongDist = self.adrv_info_1ea["LF_DETECT_DISTANCE"] - ret.rightLongDist = self.adrv_info_1ea["RF_DETECT_DISTANCE"] - self.lr_distance = self.adrv_info_1ea["LR_DETECT_DISTANCE"] - self.rr_distance = self.adrv_info_1ea["RR_DETECT_DISTANCE"] - ret.leftLatDist = self.adrv_info_1ea["LF_DETECT_LATERAL"] - ret.rightLatDist = self.adrv_info_1ea["RF_DETECT_LATERAL"] + ret.leftLongDist = self.adrv_0x1ea["LF_DETECT_DISTANCE"] + ret.rightLongDist = self.adrv_0x1ea["RF_DETECT_DISTANCE"] + self.lr_distance = self.adrv_0x1ea["LR_DETECT_DISTANCE"] + self.rr_distance = self.adrv_0x1ea["RR_DETECT_DISTANCE"] + ret.leftLatDist = self.adrv_0x1ea["LF_DETECT_LATERAL"] + ret.rightLatDist = self.adrv_0x1ea["RF_DETECT_LATERAL"] corner = True if corner: left_block = True if 0 < ret.leftLongDist < 7.0 or 0 < self.lr_distance < 7.0 else False @@ -555,9 +586,6 @@ def update_canfd(self, can_parsers) -> structs.CarState: if right_block: ret.rightBlindspot = True - self.adrv_info_160 = cp_cam.vl["ADRV_0x160"] if self.ADRV_0x160 else None - - self.hda_info_4a3 = cp.vl["HDA_INFO_4A3"] if self.HDA_INFO_4A3 else None if self.hda_info_4a3 is not None: speedLimit = self.hda_info_4a3["SPEED_LIMIT"] if not self.is_metric: @@ -570,18 +598,13 @@ def update_canfd(self, can_parsers) -> structs.CarState: country_code = int(self.hda_info_4a3["CountryCode"]) self.time_zone = ZoneInfo(NUMERIC_TO_TZ.get(country_code, "UTC")) - self.new_msg_4b4 = cp.vl["NEW_MSG_4B4"] if self.NEW_MSG_4B4 else None - self.tcs_info_373 = cp.vl["TCS"] - ret.gearStep = cp.vl["GEAR"]["GEAR_STEP"] if self.GEAR else 0 if 1 <= ret.gearStep <= 8 and ret.gearShifter == GearShifter.unknown: ret.gearShifter = GearShifter.drive ret.gearStep = cp.vl["GEAR_ALT"]["GEAR_STEP"] if self.GEAR_ALT else ret.gearStep if cp_alt and self.CP.flags & HyundaiFlags.CAMERA_SCC: - lane_info = None - lane_info = cp_alt.vl["CAM_0x362"] if self.CAM_0x362 else None - lane_info = cp_alt.vl["CAM_0x2a4"] if self.CAM_0x2a4 else lane_info + lane_info = self.cam_0x2a4 if self.cam_0x2a4 is not None else self.cam_0x362 if lane_info is not None: left_lane_prob = lane_info["LEFT_LANE_PROB"] @@ -644,12 +667,6 @@ def update_canfd(self, can_parsers) -> structs.CarState: self.buttons_counter = cp.vl[self.cruise_btns_msg_canfd]["COUNTER"] ret.accFaulted = cp.vl["TCS"]["ACCEnable"] != 0 # 0 ACC CONTROL ENABLED, 1-3 ACC CONTROL DISABLED - if not (self.CP.flags & HyundaiFlags.CAMERA_SCC): - if self.msg_0x362 is not None or 0x362 in cp_cam.seen_addresses: - self.msg_0x362 = cp_cam.vl["CAM_0x362"] - elif self.msg_0x2a4 is not None or 0x2a4 in cp_cam.seen_addresses: - self.msg_0x2a4 = cp_cam.vl["CAM_0x2a4"] - speed_conv = CV.KPH_TO_MS # if self.is_metric else CV.MPH_TO_MS cluSpeed = cp.vl["CRUISE_BUTTONS_ALT"]["CLU_SPEED"] ret.vEgoCluster = cluSpeed * speed_conv # MPH단위에서도 KPH로 나오는듯.. diff --git a/opendbc_repo/opendbc/car/hyundai/hyundaicanfd.py b/opendbc_repo/opendbc/car/hyundai/hyundaicanfd.py index bb18cc0e33..30f7116f1a 100644 --- a/opendbc_repo/opendbc/car/hyundai/hyundaicanfd.py +++ b/opendbc_repo/opendbc/car/hyundai/hyundaicanfd.py @@ -89,27 +89,29 @@ def CAM(self): def create_steering_messages_camera_scc(frame, packer, CP, CAN, CC, lat_active, apply_steer, CS, apply_angle, max_torque, angle_control): emergency_steering = False - if CS.adrv_info_161 is not None: - values = CS.adrv_info_161 + if CS.adrv_0x161 is not None: + values = CS.adrv_0x161 emergency_steering = values["ALERTS_1"] in [11, 12, 13, 14, 15, 21, 22, 23, 24, 25, 26] ret = [] - values = copy.copy(CS.mdps_info) - if angle_control: - if CS.lfa_alt_info is not None: - values["LFA2_ACTIVE"] = CS.lfa_alt_info["LKAS_ANGLE_ACTIVE"] - else: - if CS.lfa_info is not None: - values["LKA_ACTIVE"] = 1 if CS.lfa_info["STEER_REQ"] == 1 else 0 + if CS.mdps is not None: + values = copy.copy(CS.mdps) + rx_counter = values.pop("COUNTER", None) + if angle_control: + if CS.lfa_alt is not None: + values["LFA2_ACTIVE"] = CS.lfa_alt["LKAS_ANGLE_ACTIVE"] + else: + if CS.lfa is not None: + values["LKA_ACTIVE"] = 1 if CS.lfa["STEER_REQ"] == 1 else 0 - if frame % 1000 < 40: - values["STEERING_COL_TORQUE"] += 220 - ret.append(packer.make_can_msg("MDPS", CAN.CAM, values)) + if frame % 1000 < 40: + values["STEERING_COL_TORQUE"] += 220 + ret.append(packer.make_can_msg("MDPS", CAN.CAM, values, rx_counter = rx_counter)) if frame % 10 == 0: - if CS.steer_touch_info is not None: - values = copy.copy(CS.steer_touch_info) + if CS.steer_touch_2af is not None: + values = copy.copy(CS.steer_touch_2af) if frame % 1000 < 40: values["TOUCH_DETECT"] = 3 values["TOUCH1"] = 50 @@ -121,29 +123,35 @@ def create_steering_messages_camera_scc(frame, packer, CP, CAN, CC, lat_active, ret.append(packer.make_can_msg("STEER_TOUCH_2AF", CAN.CAM, values)) if angle_control: - if emergency_steering: - values = copy.copy(CS.lfa_alt_info) - else: - values = {} #CS.lfa_alt_info - values["LKAS_ANGLE_ACTIVE"] = 2 if CC.latActive else 1 - values["LKAS_ANGLE_CMD"] = -apply_angle - values["LKAS_ANGLE_MAX_TORQUE"] = max_torque if CC.latActive else 0 - ret.append(packer.make_can_msg("LFA_ALT", CAN.ECAN, values)) - - values = copy.copy(CS.lfa_info) - if not emergency_steering: - values["LKA_MODE"] = 0 - values["LKA_ICON"] = 2 if CC.latActive else 1 - values["TORQUE_REQUEST"] = -1024 # apply_steer, - values["VALUE63"] = 0 # LKA_ASSIST - values["STEER_REQ"] = 0 # 1 if lat_active else 0, - values["HAS_LANE_SAFETY"] = 0 # hide LKAS settings - values["LKA_ACTIVE"] = 3 if CC.latActive else 0 # this changes sometimes, 3 seems to indicate engaged - values["VALUE64"] = 0 #STEER_MODE, NEW_SIGNAL_2 - values["LKAS_ANGLE_CMD"] = -25.6 #-apply_angle, - values["LKAS_ANGLE_ACTIVE"] = 0 #2 if lat_active else 1, - values["LKAS_ANGLE_MAX_TORQUE"] = 0 #max_torque if lat_active else 0, - values["NEW_SIGNAL_1"] = 10 + if CS.lfa_alt is not None: + values = copy.copy(CS.lfa_alt) + rx_counter = values.pop("COUNTER", None) + if emergency_steering: + pass + else: + #values = {} #CS.lfa_alt + values["LKAS_ANGLE_ACTIVE"] = 2 if CC.latActive else 1 + values["LKAS_ANGLE_CMD"] = -apply_angle + values["LKAS_ANGLE_MAX_TORQUE"] = max_torque if CC.latActive else 0 + ret.append(packer.make_can_msg("LFA_ALT", CAN.ECAN, values, rx_counter = rx_counter)) + + if CS.lfa is not None: + values = copy.copy(CS.lfa) + rx_counter = values.pop("COUNTER", None) + if not emergency_steering: + values["LKA_MODE"] = 0 + values["LKA_ICON"] = 2 if CC.latActive else 1 + values["TORQUE_REQUEST"] = -1024 # apply_steer, + values["VALUE63"] = 0 # LKA_ASSIST + values["STEER_REQ"] = 0 # 1 if lat_active else 0, + values["HAS_LANE_SAFETY"] = 0 # hide LKAS settings + values["LKA_ACTIVE"] = 3 if CC.latActive else 0 # this changes sometimes, 3 seems to indicate engaged + values["VALUE64"] = 0 #STEER_MODE, NEW_SIGNAL_2 + values["LKAS_ANGLE_CMD"] = -25.6 #-apply_angle, + values["LKAS_ANGLE_ACTIVE"] = 0 #2 if lat_active else 1, + values["LKAS_ANGLE_MAX_TORQUE"] = 0 #max_torque if lat_active else 0, + values["NEW_SIGNAL_1"] = 10 + ret.append(packer.make_can_msg("LFA", CAN.ECAN, values, rx_counter = rx_counter)) else: values = {} @@ -160,7 +168,7 @@ def create_steering_messages_camera_scc(frame, packer, CP, CAN, CC, lat_active, #values["VALUE82_SET256"] = 0 - ret.append(packer.make_can_msg("LFA", CAN.ECAN, values)) + ret.append(packer.make_can_msg("LFA", CAN.ECAN, values)) return ret @@ -214,12 +222,12 @@ def create_steering_messages(packer, CP, CAN, enabled, lat_active, apply_steer, return ret def create_suppress_lfa(packer, CAN, CS): - if CS.msg_0x362 is not None: + if CS.cam_0x362 is not None: suppress_msg = "CAM_0x362" - lfa_block_msg = CS.msg_0x362 - elif CS.msg_0x2a4 is not None: + lfa_block_msg = CS.cam_0x362 + elif CS.cam_0x2a4 is not None: suppress_msg = "CAM_0x2a4" - lfa_block_msg = CS.msg_0x2a4 + lfa_block_msg = CS.cam_0x2a4 else: return [] @@ -275,7 +283,7 @@ def create_acc_cancel(packer, CP, CAN, cruise_info_copy): def create_lfahda_cluster(packer, CS, CAN, long_active, lat_active): - if CS.lfahda_cluster_info is not None: + if CS.lfahda_cluster is not None: values = {} # values["HDA_CntrlModSta"] = 2 if long_active else 0 values["HDA_LFA_SymSta"] = 2 if lat_active else 0 @@ -287,6 +295,9 @@ def create_lfahda_cluster(packer, CS, CAN, long_active, lat_active): def create_acc_control_scc2(packer, CAN, enabled, accel_last, accel, stopping, gas_override, set_speed, hud_control, hyundai_jerk, CS): + + if CS.scc_control is None: + return None enabled = (enabled or CS.softHoldActive > 0) and CS.paddle_button_prev == 0 acc_mode = 0 if not enabled else (2 if gas_override else 1) @@ -309,8 +320,8 @@ def create_acc_control_scc2(packer, CAN, enabled, accel_last, accel, stopping, g a_raw = accel a_val = accel #np.clip(accel, accel_last - jn, accel_last + jn) - values = copy.copy(CS.cruise_info) - values.pop("COUNTER", None) + values = copy.copy(CS.scc_control) + rx_counter = values.pop("COUNTER", None) values["ACCMode"] = acc_mode values["MainMode_ACC"] = 1 values["StopReq"] = 1 if stopping or CS.softHoldActive > 0 else 0 # 1: Stop control is required, 2: Not used, 3: Error Indicator @@ -358,7 +369,7 @@ def create_acc_control_scc2(packer, CAN, enabled, accel_last, accel, stopping, g values["ZEROS_7"] = 1 - return packer.make_can_msg("SCC_CONTROL", CAN.ECAN, values) + return packer.make_can_msg("SCC_CONTROL", CAN.ECAN, values, rx_counter = rx_counter) def create_acc_control(packer, CAN, enabled, accel_last, accel, stopping, gas_override, set_speed, hud_control, jerk_u, jerk_l, CS): @@ -437,8 +448,9 @@ def create_fca_warning_light(CP, packer, CAN, frame): def create_tcs_messages(packer, CAN, CS): ret = [] - if CS.tcs_info_373 is not None: - values = copy.copy(CS.tcs_info_373) + if CS.tcs is not None: + values = copy.copy(CS.tcs) + rx_counter = values.pop("COUNTER", None) values["DriverBraking"] = 0 values["NEW_SIGNAL_20"] = 0 values["NEW_SIGNAL_11"] = 0 @@ -446,7 +458,7 @@ def create_tcs_messages(packer, CAN, CS): #values["NEW_SIGNAL_1"] = 0 # accel과 관련.. 옆두부 꺼지는것과 관련? 확인필요 #values["ACC_REQ"] = 1 # 옆두부 꺼지는것과 관련? 확인필요.. 항상 켜지게함.. values["NEW_SIGNAL_1"] = 0 if values["ACC_REQ"] == 1 else 1 # 옆두부.. - ret.append(packer.make_can_msg("TCS", CAN.CAM, values)) + ret.append(packer.make_can_msg("TCS", CAN.CAM, values, rx_counter = rx_counter)) return ret def forward_button_message(packer, CAN, frame, CS, cruise_button, MainMode_ACC_trigger, LFA_trigger): @@ -454,6 +466,7 @@ def forward_button_message(packer, CAN, frame, CS, cruise_button, MainMode_ACC_t if frame % 2 == 0: if CS.cruise_buttons_msg is not None: values = copy.copy(CS.cruise_buttons_msg) + rx_counter = values.pop("COUNTER", None) cruise_button_driver = values["CRUISE_BUTTONS"] if cruise_button_driver == 0: values["CRUISE_BUTTONS"] = cruise_button @@ -463,288 +476,9 @@ def forward_button_message(packer, CAN, frame, CS, cruise_button, MainMode_ACC_t elif LFA_trigger > 0: values["LFA_BTN"] = 1 - ret.append(packer.make_can_msg(CS.cruise_btns_msg_canfd, CAN.CAM, values)) + ret.append(packer.make_can_msg(CS.cruise_btns_msg_canfd, CAN.CAM, values, rx_counter = rx_counter)) return ret -""" -def _make_ccnc_values___(values, CS, lat_active, frame, hud_control, lane_line = True, corner_radar = True): - if lane_line: - curvature = round(CS.out.steeringAngleDeg / 3) - values["LANELINE_CURVATURE"] = (min(abs(curvature), 15) + (-1 if curvature < 0 else 0)) if lat_active else 0 - values["LANELINE_CURVATURE_DIRECTION"] = 1 if curvature < 0 and lat_active else 0 - - md = CS.MD - if md is not None: - desire = md.meta.desire.raw - if desire == 1: # # 좌회전 - values['LANE_CHANGING'] = 1 # 왼쪽 화살표 - values["LANELINE_CURVATURE"] = 15 # 커브 최대 - values["LANELINE_CURVATURE_DIRECTION"] = 0 # 왼쪽으로 - - elif desire == 2: # 우회전 - values['LANE_CHANGING'] = 2 # 오른쪽 화살표 - values["LANELINE_CURVATURE"] = 15 # 차선커브 최대로 - values["LANELINE_CURVATURE_DIRECTION"] = 1 # 오른쪽으로 - - elif desire == 3: # 좌차선변경 - values['LANE_CHANGING'] = 3 # 왼쪽 화살표 + 바닥 - - elif desire == 4: # 우차선변경 - values['LANE_CHANGING'] = 4 # 오른쪽 화살표 + 바닥 - - if corner_radar: - if values['LF_DETECT'] >= 4 and values['LF_DETECT_DISTANCE'] != 0: values['LF_DETECT'] = 1 - if values['RF_DETECT'] >= 4 and values['RF_DETECT_DISTANCE'] != 0: values['RF_DETECT'] = 1 - if values['LR_DETECT'] >= 4 and values['LR_DETECT_DISTANCE'] != 0: values['LR_DETECT'] = 1 - if values['RR_DETECT'] >= 4 and values['RR_DETECT_DISTANCE'] != 0: values['RR_DETECT'] = 1 - - disp_dist = 30.0 - min_dist = 14.0 - max_interval = 100 - t = 1.0 # 이 값만 바꾸면 전체 깜빡임 속도 조절됨 (0.6 빠름, 1.0 기본, 1.5 느림) - def apply_one(detect_key, dist_key): - dist = values.get(dist_key, 0.0) - if dist <= min_dist: - return - d = min(dist, disp_dist) - interval = int((1 + (max_interval - 1) * (d / disp_dist)) * t) - interval = max(1, min(interval, max_interval)) - blink = (frame // interval) & 1 - values[detect_key] = 2 - blink - values[dist_key] = min_dist - - apply_one('LR_DETECT', 'LR_DETECT_DISTANCE') - apply_one('RR_DETECT', 'RR_DETECT_DISTANCE') - -def create_ccnc_messages___(CP, packer, CAN, frame, CC, CS, hud_control, disp_angle, left_lane_warning, right_lane_warning, enable_corner_radar): - ret = [] - md = CS.MD - desire = 0 - lane_changing = 0 - if md is not None: - desire = md.meta.desire.raw - desire_state = md.meta.desireState - if len(desire_state) > 4: - if desire_state[1] > 0.3 : lane_changing = 1 - if desire_state[2] > 0.3 : lane_changing = 2 - if desire_state[3] > 0.3 : lane_changing = 3 - if desire_state[4] > 0.3 : lane_changing = 4 - - if CP.flags & HyundaiFlags.CAMERA_SCC.value: - HDA_CntrlModSta = 0 - if CS.lfahda_cluster_info is not None: - HDA_CntrlModSta = CS.lfahda_cluster_info["HDA_CntrlModSta"] - - if frame % 2 == 0: - if CS.adrv_info_160 is not None: - values = copy.copy(CS.adrv_info_160) - #values["NEW_SIGNAL_1"] = 0 # steer_temp관련없음, 계기판에러 - #values["SET_ME_9"] = 17 # steer_temp관련없음, 계기판에러 - #values["SET_ME_2"] = 0 #커멘트해도 steer_temp에러남, 2값은 콤마에서 찾은거니... - #values["DATA102"] = 0 # steer_temp관련없음 - ret.append(packer.make_can_msg("ADRV_0x160", CAN.ECAN, values)) - - if CS.cruise_buttons_msg is not None: - values = copy.copy(CS.cruise_buttons_msg) - if CS.lfahda_cluster_info["HDA_LFA_SymSta"] == 0 and 0 < frame % 200 < 12: - values["LFA_BTN"] = 1 - #else: - # values["LFA_BTN"] = 0 - - if CC.enabled and CS.MainMode_ACC: - if CS.ACCMode in [0, 4] and 10 < frame % 200 < 22: - values["CRUISE_BUTTONS"] = 2 - elif CC.enabled and not CS.MainMode_ACC and 10 < frame % 200 <= 16 and CS.out.vEgo > 3.: - values["ADAPTIVE_CRUISE_MAIN_BTN"] = 1 - else: - values["ADAPTIVE_CRUISE_MAIN_BTN"] = 0 - - ret.append(packer.make_can_msg(CS.cruise_btns_msg_canfd, CAN.CAM, values)) - - - if frame % 5 == 0: - lat_active = CC.latActive - if CS.adrv_info_161 is not None: - main_enabled = CS.out.cruiseState.available - cruise_enabled = CC.enabled - lat_enabled = CS.out.latEnabled - nav_active = hud_control.activeCarrot > 1 - - # hdpuse carrot - hdp_use = int(Params().get("HDPuse")) - hdp_active = False - if hdp_use == 1: - hdp_active = cruise_enabled and nav_active - elif hdp_use == 2: - hdp_active = cruise_enabled - # hdpuse carrot - - values = copy.copy(CS.adrv_info_161) - #print("adrv_info_161 = ", CS.adrv_info_161) - - values["SETSPEED"] = (6 if hdp_active else 3 if cruise_enabled else 1) if main_enabled else 0 - values["SETSPEED_HUD"] = (5 if hdp_active else 3 if cruise_enabled else 1) if main_enabled else 0 - set_speed_in_units = hud_control.setSpeed * (CV.MS_TO_KPH if CS.is_metric else CV.MS_TO_MPH) - values["vSetDis"] = int(set_speed_in_units + 0.5) - - values["DISTANCE"] = 4 if hdp_active else hud_control.leadDistanceBars - values["DISTANCE_LEAD"] = 2 if cruise_enabled and hud_control.leadVisible else 1 if main_enabled and hud_control.leadVisible else 0 - values["DISTANCE_CAR"] = 3 if hdp_active else 2 if cruise_enabled else 1 if main_enabled else 0 - values["DISTANCE_SPACING"] = 5 if hdp_active else 1 if cruise_enabled else 0 - - values["TARGET"] = 1 if main_enabled else 0 - values["TARGET_DISTANCE"] = int(hud_control.leadDistance) - - values["BACKGROUND"] = 6 if CS.paddle_button_prev > 0 else 1 if cruise_enabled else 3 if main_enabled else 7 - values["CENTERLINE"] = 1 if HDA_CntrlModSta > 0 else 0 #lat_enabled else 0 - values["CAR_CIRCLE"] = 2 if hdp_active else 1 if cruise_enabled else 0 - - values["NAV_ICON"] = 2 if nav_active else 0 - values["HDA_ICON"] = 5 if hdp_active else 2 if cruise_enabled else 1 if main_enabled else 0 - values["LFA_ICON"] = 5 if hdp_active else 2 if lat_active else 1 if lat_enabled else 0 - values["LKA_ICON"] = 4 if lat_active else 3 if lat_enabled else 0 - values["FCA_ALT_ICON"] = 0 - - if values["ALERTS_2"] in [1, 2, 5, 6, 10, 21, 22]: # 10,21,22: 운전자모니터 알람/경고, 6: enable lanechange alert - values["ALERTS_2"] = 0 - values["DAW_ICON"] = 0 - - values["SOUNDS_1"] = 0 # 운전자모니터경고음. - values["SOUNDS_2"] = 0 # 2: STEER중지 경고후에도 사운드가 나옴. - values["SOUNDS_4"] = 0 # 차선변경알림? 에이 그냥0으로.. - - if values["ALERTS_3"] in [3, 4, 13, 17, 19, 26, 7, 8, 9, 10]: - values["ALERTS_3"] = 0 - values["SOUNDS_3"] = 0 - - if values["ALERTS_5"] in [1, 2, 4, 5]: - values["ALERTS_5"] = 0 - - if values["ALERTS_5"] in [11] and CS.softHoldActive == 0: - values["ALERTS_5"] = 0 - - curvature = round(CS.out.steeringAngleDeg / 3) - - values["LANELINE_CURVATURE"] = (min(abs(curvature), 15) + (-1 if curvature < 0 else 0)) if lat_active else 0 - values["LANELINE_CURVATURE_DIRECTION"] = 1 if curvature < 0 and lat_active else 0 - - # lane_color = 6 if lat_active else 2 - #lane_color = 2 # 6: green, 2: white, 4: yellow - lane_color = 2 if CS.out.leftLaneLine < 20 else 4 - if hud_control.leftLaneDepart: - values["LANELINE_LEFT"] = 4 if (frame // 50) % 2 == 0 else 1 - else: - values["LANELINE_LEFT"] = lane_color if hud_control.leftLaneVisible else 0 - lane_color = 2 if CS.out.rightLaneLine < 20 else 4 - if hud_control.rightLaneDepart: - values["LANELINE_RIGHT"] = 4 if (frame // 50) % 2 == 0 else 1 - else: - values["LANELINE_RIGHT"] = lane_color if hud_control.rightLaneVisible else 0 - #values["LANELINE_LEFT_POSITION"] = 15 - #values["LANELINE_RIGHT_POSITION"] = 15 - - values["LCA_LEFT_ARROW"] = 2 if CS.out.leftBlinker else 0 - values["LCA_RIGHT_ARROW"] = 2 if CS.out.rightBlinker else 0 - - values["LCA_LEFT_ICON"] = 1 if CS.out.leftBlindspot else 2 - values["LCA_RIGHT_ICON"] = 1 if CS.out.rightBlindspot else 2 - - values["LANE_LEFT"] = 1 if desire in (1, 3) else 0 - values["LANE_RIGHT"] = 1 if desire in (2, 4) else 0 - - ret.append(packer.make_can_msg("ADRV_0x161", CAN.ECAN, values)) - - if CS.adrv_info_200 is not None: - values = copy.copy(CS.adrv_info_200) - values["TauGapSet"] = hud_control.leadDistanceBars - ret.append(packer.make_can_msg("ADRV_0x200", CAN.ECAN, values)) - - if CS.adrv_info_1ea is not None: - values = copy.copy(CS.adrv_info_1ea) - #values["HDA_MODE1"] = 8 - #values["HDA_MODE2"] = 1 - if lane_changing == 3: - values['LEFT_BLINK_HOLD'] = 1 - elif lane_changing == 4: - values['RIGHT_BLINK_HOLD'] = 1 - - _make_ccnc_values(values, CS, lat_active, frame, hud_control) - # values['AUTOLANECHANGE_MSG'] = 1 # 주변 상황을 확인하세요 - # values['AUTOLANECHANGE_MSG'] = 2 # 작동 조건이 아닙니다 - # values['AUTOLANECHANGE_MSG'] = 3 # 주행 차로를 분석중입니다 - # values['AUTOLANECHANGE_MSG'] = 4 # 급커브 구간입니다 - # values['AUTOLANECHANGE_MSG'] = 5 # 주행 중인 차로의 폭이 좁습니다 - # values['AUTOLANECHANGE_MSG'] = 6 # 작동 구간이 아닙니다. - # values['AUTOLANECHANGE_MSG'] = 7 # 비상등이 켜져있습니다 - # values['AUTOLANECHANGE_MSG'] = 8 # 주행속도가 낮습니다 - # values['AUTOLANECHANGE_MSG'] = 9 # 핸들을 잡으십시오 - # values['AUTOLANECHANGE_MSG'] = 10 # 작동 가능한 차로가 아닙니다 - # values['AUTOLANECHANGE_MSG'] = 11 # 핸들 조작이 감지되었습니다. - # 얘는 우측 RPM 게이지에 크게 나옴 - # values['AUTOLANECHANGE_MSG'] = 12 # ok 버튼을 누르면 차로변경 보조기능이 켜집니다 - # values['AUTOLANECHANGE_MSG'] = 13 # 없음. - # values['AUTOLANECHANGE_MSG'] = 14 # 없음. - # values['AUTOLANECHANGE_MSG'] = 15 # 없음. - ret.append(packer.make_can_msg("ADRV_0x1ea", CAN.ECAN, values)) - - if CS.adrv_info_162 is not None: - values = copy.copy(CS.adrv_info_162) - if hud_control.leadDistance > 0: - values["FF_DISTANCE"] = hud_control.leadDistance - #values["FF_DETECT"] = 11 if hud_control.leadRelSpeed > -0.1 else 12 # bicycle - #values["FF_DETECT"] = 5 if hud_control.leadRelSpeed > -0.1 else 6 # truck - ff_type = 3 if hud_control.leadRadar == 1 else 13 - values["FF_DETECT"] = ff_type if hud_control.leadRelSpeed > -0.1 else ff_type + 1 - #values["FF_DETECT_LAT"] = - hud_control.leadDPath - _make_ccnc_values(values, CS, lat_active, frame, hud_control, lane_line = False, corner_radar= True) - - #values["FAULT_FCA"] = 0 - #values["FAULT_LSS"] = 0 - #values["FAULT_LFA"] = 0 - #values["FAULT_LCA"] = 0 - #values["FAULT_DAS"] = 0 - #values["FAULT_HDA"] = 0 - - if (left_lane_warning and not CS.out.leftBlinker) or (right_lane_warning and not CS.out.rightBlinker): - values["VIBRATE"] = 1 - ret.append(packer.make_can_msg("CCNC_0x162", CAN.ECAN, values)) - - if enable_corner_radar > 0: - if HDA_CntrlModSta == 0: - if frame % 500 in [10,20,30]: - values = { - 'BYTE_1': 0, - 'BYTE_2': 0, - 'BYTE_3': 0x80, - 'BYTE_4': 0x8A, - 'BYTE_5': 0x32, - 'BYTE_6': 0x30, - 'BYTE_7': 0x01, - 'BYTE_8': 0x00, - } - ret.append(packer.make_can_msg("NEW_MSG_4B9", CAN.CAM, values)) - elif frame % 500 in [40,50,60]: - values = { - 'BYTE_1': 0xff, - 'BYTE_2': 0xff, - 'BYTE_3': 0xff, - 'BYTE_4': 0xff, - 'BYTE_5': 0xff, - 'BYTE_6': 0xff, - 'BYTE_7': 0xff, - 'BYTE_8': 0xff, - } - ret.append(packer.make_can_msg("NEW_MSG_4B9", CAN.CAM, values)) - if False: #canfd_debug > 1 and frame % 20 == 0: # 아직 시험중.. - if CS.hda_info_4a3 is not None: - values = copy.copy(CS.hda_info_4a3) - values["LinkClass"] = 1 - values["SPEED_LIMIT"] = 100 - ret.append(packer.make_can_msg("HDA_INFO_4A3", CAN.CAM, values)) - - return ret -""" - def create_adrv_messages(CP, packer, CAN, frame): # messages needed to car happy after disabling # the ADAS Driving ECU to do longitudinal control @@ -911,18 +645,20 @@ def create_ccnc_messages(CP, packer, CAN, frame, CC, CS, hud_control, if CP.flags & HyundaiFlags.CAMERA_SCC.value: HDA_CntrlModSta = 0 - if CS.lfahda_cluster_info is not None: - HDA_CntrlModSta = CS.lfahda_cluster_info["HDA_CntrlModSta"] + HDA_LFA_SymSta = 0 + if CS.lfahda_cluster is not None: + HDA_CntrlModSta = CS.lfahda_cluster["HDA_CntrlModSta"] + HDA_LFA_SymSta = CS.lfahda_cluster["HDA_LFA_SymSta"] if frame % 2 == 0: - #if CS.adrv_info_160 is not None: - # values = copy.copy(CS.adrv_info_160) + #if CS.adrv_0x160 is not None: + # values = copy.copy(CS.adrv_0x160) # ret.append(packer.make_can_msg("ADRV_0x160", CAN.ECAN, values)) if CS.cruise_buttons_msg is not None: values = copy.copy(CS.cruise_buttons_msg) - if CS.lfahda_cluster_info["HDA_LFA_SymSta"] == 0 and 0 < frame % 200 < 12: + if HDA_LFA_SymSta == 0 and 0 < frame % 200 < 12: values["LFA_BTN"] = 1 if CC.enabled and CS.MainMode_ACC: @@ -939,7 +675,7 @@ def create_ccnc_messages(CP, packer, CAN, frame, CC, CS, hud_control, if frame % 5 == 0: lat_active = CC.latActive - if CS.adrv_info_161 is not None: + if CS.adrv_0x161 is not None: main_enabled = CS.out.cruiseState.available cruise_enabled = CC.enabled lat_enabled = CS.out.latEnabled @@ -954,8 +690,8 @@ def create_ccnc_messages(CP, packer, CAN, frame, CC, CS, hud_control, hdp_active = cruise_enabled # hdpuse carrot - values = copy.copy(CS.adrv_info_161) - + values = copy.copy(CS.adrv_0x161) + rx_counter = values.pop("COUNTER", None) values["SETSPEED"] = (6 if hdp_active else 3 if cruise_enabled else 1) if main_enabled else 0 values["SETSPEED_HUD"] = (5 if hdp_active else 3 if cruise_enabled else 1) if main_enabled else 0 @@ -989,7 +725,7 @@ def create_ccnc_messages(CP, packer, CAN, frame, CC, CS, hud_control, values["SOUNDS_2"] = 0 values["SOUNDS_4"] = 0 - if values["ALERTS_3"] in [3, 4, 13, 17, 19, 26, 7, 8, 9, 10]: + if values["ALERTS_3"] in [3, 4, 11, 12, 13, 14, 17, 19, 26, 7, 8, 9, 10]: # hide gap distance msg.(11,12,13,14) values["ALERTS_3"] = 0 values["SOUNDS_3"] = 0 @@ -1004,13 +740,15 @@ def create_ccnc_messages(CP, packer, CAN, frame, CC, CS, hud_control, values["LANELINE_CURVATURE"] = (min(abs(curvature), 15) + (-1 if curvature < 0 else 0)) if lat_active else 0 values["LANELINE_CURVATURE_DIRECTION"] = 1 if curvature < 0 and lat_active else 0 - lane_color = 4 if CS.out.leftLaneLine >= 20 or CS.out.leftBlindspot else 2 + lane_color = 6 if md is not None and md.meta.laneChangeAvailableLeft else 2 + lane_color = 4 if CS.out.leftLaneLine >= 20 or CS.out.leftBlindspot else lane_color if hud_control.leftLaneDepart: values["LANELINE_LEFT"] = 4 if (frame // 50) % 2 == 0 else 1 else: values["LANELINE_LEFT"] = lane_color if hud_control.leftLaneVisible else 0 - lane_color = 4 if CS.out.rightLaneLine >= 20 or CS.out.rightBlindspot else 2 + lane_color = 6 if md is not None and md.meta.laneChangeAvailableRight else 2 + lane_color = 4 if CS.out.rightLaneLine >= 20 or CS.out.rightBlindspot else lane_color if hud_control.rightLaneDepart: values["LANELINE_RIGHT"] = 4 if (frame // 50) % 2 == 0 else 1 else: @@ -1025,16 +763,17 @@ def create_ccnc_messages(CP, packer, CAN, frame, CC, CS, hud_control, values["LANE_LEFT"] = 1 if desire in (1, 3) else 0 values["LANE_RIGHT"] = 1 if desire in (2, 4) else 0 - ret.append(packer.make_can_msg("ADRV_0x161", CAN.ECAN, values)) + ret.append(packer.make_can_msg("ADRV_0x161", CAN.ECAN, values, rx_counter = rx_counter)) - if CS.adrv_info_200 is not None: - values = copy.copy(CS.adrv_info_200) + if CS.adrv_0x200 is not None: + values = copy.copy(CS.adrv_0x200) + rx_counter = values.pop("COUNTER", None) values["TauGapSet"] = hud_control.leadDistanceBars - ret.append(packer.make_can_msg("ADRV_0x200", CAN.ECAN, values)) - - if CS.adrv_info_1ea is not None: - values = copy.copy(CS.adrv_info_1ea) + ret.append(packer.make_can_msg("ADRV_0x200", CAN.ECAN, values, rx_counter = rx_counter)) + if CS.adrv_0x1ea is not None: + values = copy.copy(CS.adrv_0x1ea) + rx_counter = values.pop("COUNTER", None) # blinker hold values['LEFT_BLINK_HOLD'] = 1 if lane_changing == 3 else 0 values['RIGHT_BLINK_HOLD'] = 1 if lane_changing == 4 else 0 @@ -1050,10 +789,10 @@ def create_ccnc_messages(CP, packer, CAN, frame, CC, CS, hud_control, blink_t=1.0 ) - ret.append(packer.make_can_msg("ADRV_0x1ea", CAN.ECAN, values)) + ret.append(packer.make_can_msg("ADRV_0x1ea", CAN.ECAN, values, rx_counter = rx_counter)) - if CS.adrv_info_162 is not None: - values = copy.copy(CS.adrv_info_162) + if CS.ccnc_0x162 is not None: + values = copy.copy(CS.ccnc_0x162) if hud_control.leadDistance > 0: values["FF_DISTANCE"] = hud_control.leadDistance diff --git a/selfdrive/carrot/carrot_controls.py b/selfdrive/carrot/carrot_controls.py new file mode 100644 index 0000000000..d709876e85 --- /dev/null +++ b/selfdrive/carrot/carrot_controls.py @@ -0,0 +1,40 @@ +from openpilot.common.realtime import DT_CTRL +from openpilot.common.params import Params + +class CarrotControls: + def __init__(self, CP): + self.CP = CP + self.params = Params() + self.lat_suspend_active = False + self.lat_suspend_enter_t = 0.0 + self.lat_suspend_hold_t = 0.0 + + def lat_suspend_control(self, CS, latActive): + suspend_angle = float(self.params.get_int("LatSuspendAngleDeg")) + resume_angle = 15 + delay_sec = 1.0 + hold_sec = 0.5 + + # 1) enter condition timer + enter_cond = CS.steeringPressed and abs(CS.steeringAngleDeg) > suspend_angle + if not self.lat_suspend_active: + if enter_cond: + self.lat_suspend_enter_t += DT_CTRL + if self.lat_suspend_enter_t >= delay_sec: + self.lat_suspend_active = True + self.lat_suspend_hold_t = 0.0 + else: + self.lat_suspend_enter_t = 0.0 + + # 2) while suspended: enforce minimum hold time + hysteresis exit + if self.lat_suspend_active: + self.lat_suspend_hold_t += DT_CTRL + + exit_cond = (abs(CS.steeringAngleDeg) < resume_angle) and (not CS.steeringPressed) + if (self.lat_suspend_hold_t >= hold_sec) and exit_cond: + self.lat_suspend_active = False + self.lat_suspend_enter_t = 0.0 + + if self.lat_suspend_active: + latActive = False + return latActive \ No newline at end of file diff --git a/selfdrive/carrot/carrot_man.py b/selfdrive/carrot/carrot_man.py index 87869dc030..286154c972 100644 --- a/selfdrive/carrot/carrot_man.py +++ b/selfdrive/carrot/carrot_man.py @@ -260,55 +260,7 @@ def get_local_ip(self): except Exception as e: return f"Error: {e}" - def register_my_ip(self): - try: - token = "12345678" - local_ip = self.get_local_ip() - version = self.params.get("Version") - github_id = self.params.get("GithubUsername") - port = 7000 - is_onroad = self.params.get_bool("IsOnroad") - ts = int(time.time()) - url = "https://shind0.synology.me/carrot/api_heartbeat.php" - timeout_s = 3.5 - payload = { - "github_id": github_id, - "token": token, - "local_ip": local_ip, - "port": int(port), - "version": version, - "is_onroad": bool(is_onroad), - "ts": int(time.time()), - } - #if extra: - # payload.update(extra) - - data = json.dumps(payload).encode("utf-8") - print(data) - req = urllib.request.Request( - url=url, - data=data, - headers={"Content-Type": "application/json"}, - method="POST", - ) - - try: - ctx = ssl._create_unverified_context() - with urllib.request.urlopen(req, timeout=timeout_s, context=ctx) as resp: - body = resp.read().decode("utf-8", errors="replace") - # 서버가 {"ok":true} 같은 JSON을 주는 경우가 많음 - return (200 <= resp.status < 300), body - except urllib.error.HTTPError as e: - try: - body = e.read().decode("utf-8", errors="replace") - except Exception: - body = "" - return False, f"HTTPError {e.code}: {body}" - except Exception as e: - return False, f"Exception: {e}" - except Exception as e: - print(f"register_my_ip error: {e}") - + # 브로드캐스트 메시지 전송 def broadcast_version_info(self): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -355,9 +307,6 @@ def broadcast_version_info(self): if carrot_speed_active_count > 0: self.carrot_speed_serv(carrot_speed, frame) - if frame % (20 * 30) == 0: - ok, msg = self.register_my_ip() - print(f"[heartbeat] ok: {ok}, msg: {msg}") if frame % 20 == 0 or remote_addr is not None: try: self.broadcast_ip = self.get_broadcast_address() if remote_addr is None else remote_addr[0] diff --git a/selfdrive/carrot/carrot_server.py b/selfdrive/carrot/carrot_server.py index b8cdc3f144..e51844853a 100644 --- a/selfdrive/carrot/carrot_server.py +++ b/selfdrive/carrot/carrot_server.py @@ -30,6 +30,11 @@ from cereal import messaging from opendbc.car import structs import shlex +import shutil +import socket +import urllib.request +import urllib.error +import ssl BASE_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -82,6 +87,93 @@ async def log_mw(request, handler): WEBRTCD_URL = "http://127.0.0.1:5001/stream" +def _get_local_ip() -> str: + try: + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.connect(("8.8.8.8", 80)) + return s.getsockname()[0] + except Exception: + # fallback: hostname 방식(가끔 127.0.1.1 나올 수 있음) + try: + return socket.gethostbyname(socket.gethostname()) + except Exception: + return "0.0.0.0" + + +def _register_my_ip_sync(params: "Params") -> tuple[bool, str]: + """ + 기존 carrot_man.py의 register_my_ip()를 그대로 옮긴 버전 (동기) + """ + try: + token = "12345678" + local_ip = _get_local_ip() + version = params.get("Version") + github_id = params.get("GithubUsername") + port = 7000 + is_onroad = params.get_bool("IsOnroad") + url = "https://shind0.synology.me/carrot/api_heartbeat.php" + timeout_s = 3.5 + + payload = { + "github_id": github_id, + "token": token, + "local_ip": local_ip, + "port": int(port), + "version": version, + "is_onroad": bool(is_onroad), + "ts": int(time.time()), + } + + data = json.dumps(payload).encode("utf-8") + req = urllib.request.Request( + url=url, + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + + ctx = ssl._create_unverified_context() + with urllib.request.urlopen(req, timeout=timeout_s, context=ctx) as resp: + body = resp.read().decode("utf-8", errors="replace") + return (200 <= resp.status < 300), body + + except urllib.error.HTTPError as e: + try: + body = e.read().decode("utf-8", errors="replace") + except Exception: + body = "" + return False, f"HTTPError {e.code}: {body}" + except Exception as e: + return False, f"Exception: {e}" + + +async def heartbeat_loop(app: web.Application): + """ + aiohttp startup에서 create_task로 돌릴 백그라운드 루프 + - 이벤트 루프 블로킹 방지 위해 to_thread 사용 + """ + if not HAS_PARAMS: + app["hb_last"] = {"ok": False, "msg": "Params not available"} + return + + params = Params() + interval_s = 30.0 # 기존: frame%(20*30) = 30초 + while True: + try: + ok, msg = await asyncio.to_thread(_register_my_ip_sync, params) + app["hb_last"] = { + "ok": bool(ok), + "msg": str(msg)[:800], + "ts": time.time(), + "local_ip": _get_local_ip(), + } + # 원하면 로그 + # print(f"[heartbeat] ok:{ok}, msg:{msg}") + except asyncio.CancelledError: + break + except Exception as e: + app["hb_last"] = {"ok": False, "msg": f"Exception: {e}", "ts": time.time()} + await asyncio.sleep(interval_s) async def proxy_stream(request: web.Request) -> web.StreamResponse: @@ -102,10 +194,24 @@ async def proxy_stream(request: web.Request) -> web.StreamResponse: except Exception as e: return web.json_response({"ok": False, "error": str(e)}, status=502) +async def api_heartbeat_status(request: web.Request) -> web.Response: + return web.json_response({"ok": True, "hb": request.app.get("hb_last")}) + async def on_startup(app: web.Application): app["http"] = ClientSession() - + app["hb_last"] = {"ok": None, "msg": "not yet", "ts": 0} + if HAS_PARAMS: + app["hb_task"] = asyncio.create_task(heartbeat_loop(app)) + async def on_cleanup(app: web.Application): + t = app.get("hb_task") + if t: + t.cancel() + try: + await t + except Exception: + pass + sess = app.get("http") if sess: await sess.close() @@ -557,13 +663,21 @@ def run(cmd: List[str], cwd: Optional[str] = None) -> Tuple[int, str]: for pth in paths: if not os.path.isdir(pth): continue - for fn in glob.glob(os.path.join(pth, "*")): + + for name in os.listdir(pth): + full_path = os.path.join(pth, name) try: - os.remove(fn) - deleted += 1 - except Exception: - pass - return web.json_response({"ok": True, "out": f"deleted files: {deleted}"}) + if os.path.isfile(full_path) or os.path.islink(full_path): + os.remove(full_path) + deleted += 1 + elif os.path.isdir(full_path): + shutil.rmtree(full_path) + deleted += 1 + except Exception as e: + print("delete error:", e) + + return web.json_response({"ok": True, "out": f"deleted entries: {deleted}"}) + if action == "send_tmux_log": @@ -830,10 +944,23 @@ async def ws_carstate(request: web.Request) -> web.WebSocketResponse: "apm": " ", } - await ws.send_str(json.dumps(payload)) + try: + await ws.send_str(json.dumps(payload)) + except (asyncio.CancelledError, GeneratorExit): + raise + except (ConnectionResetError, BrokenPipeError, web.HTTPException): + break + except Exception as e: + # aiohttp에서 클라이언트가 끊길 때 나는 대표 예외 + if isinstance(e, (aiohttp.client_exceptions.ClientConnectionResetError,)): + break + if "Cannot write to closing transport" in str(e): + break + # traceback.print_exc() + break await asyncio.sleep(0.1) # 10Hz except Exception: - #traceback.print_exc() + traceback.print_exc() pass try: diff --git a/selfdrive/carrot_settings.json b/selfdrive/carrot_settings.json index 1032264a11..7400fcfa6e 100644 --- a/selfdrive/carrot_settings.json +++ b/selfdrive/carrot_settings.json @@ -327,7 +327,7 @@ "title": "LatSmoothSec(13)x0.01", "descr": "조향필터링값\n 높으면 부드러운조향", "egroup": "LAT", - "etitle": "LatSmoothSec(30)", + "etitle": "LatSmoothSec(13)", "edescr": "Lat smoothing values — Higher values smoother", "min": 1, "max": 30, @@ -337,6 +337,22 @@ "ctitle": "转向平滑时间(30)", "cdescr": "转向平滑值 — 值越高越平滑" }, + { + "group": "조향튜닝", + "name": "LatSuspendAngleDeg", + "title": "자동조향일시중지각도(300)", + "descr": "조향각이 커지면 자동조향을 일시중지. 15도 이내도 조향각이 돌아오면 재개함.", + "egroup": "LAT", + "etitle": "AutoSteering Suspend Angle(300)", + "edescr": "Auto steering pauses at large steering angles and resumes when it returns within 15.", + "min": 45, + "max": 300, + "default": 300, + "unit": 10 + "cgroup": "转向", + "ctitle": "AutoSteering Suspend Angle(300)", + "cdescr": "Auto steering pauses at large steering angles and resumes when it returns within 15." + }, { "group": "크루즈", "name": "CruiseOnDist", diff --git a/selfdrive/controls/controlsd.py b/selfdrive/controls/controlsd.py index 5093c3246b..8742c2f438 100644 --- a/selfdrive/controls/controlsd.py +++ b/selfdrive/controls/controlsd.py @@ -28,6 +28,8 @@ from selfdrive.modeld.modeld import LAT_SMOOTH_SECONDS from openpilot.selfdrive.locationd.helpers import PoseCalibrator, Pose +from openpilot.selfdrive.carrot.carrot_controls import CarrotControls + State = log.SelfdriveState.OpenpilotState LaneChangeState = log.LaneChangeState LaneChangeDirection = log.LaneChangeDirection @@ -73,6 +75,7 @@ def __init__(self) -> None: self.LaC = LatControlPID(self.CP, self.CI) elif self.CP.lateralTuning.which() == 'torque': self.LaC = LatControlTorque(self.CP, self.CI) + self.carrot_controls = CarrotControls(self.CP) def update(self): self.sm.update(15) @@ -119,6 +122,7 @@ def state_control(self): standstill = abs(CS.vEgo) <= max(self.CP.minSteerSpeed, MIN_LATERAL_CONTROL_SPEED) or CS.standstill CC.latActive = ((self.sm['selfdriveState'].active or lateral_enabled) and CS.latEnabled and not CS.steerFaultTemporary and not CS.steerFaultPermanent and not standstill) + CC.latActive = self.carrot_controls.lat_suspend_control(CS, CC.latActive) CC.longActive = CC.enabled and not any(e.overrideLongitudinal for e in self.sm['onroadEvents']) and self.CP.openpilotLongitudinalControl actuators = CC.actuators diff --git a/selfdrive/controls/lib/desire_helper.py b/selfdrive/controls/lib/desire_helper.py index 21698acef2..aa74865c69 100644 --- a/selfdrive/controls/lib/desire_helper.py +++ b/selfdrive/controls/lib/desire_helper.py @@ -2,103 +2,16 @@ from openpilot.common.conversions import Conversions as CV from openpilot.common.realtime import DT_MDL import numpy as np -from openpilot.selfdrive.modeld.constants import ModelConstants from openpilot.common.params import Params -from collections import deque - -LaneChangeState = log.LaneChangeState -LaneChangeDirection = log.LaneChangeDirection -TurnDirection = log.Desire - -LANE_CHANGE_SPEED_MIN = 30 * CV.KPH_TO_MS -LANE_CHANGE_TIME_MAX = 10. - -BLINKER_NONE = 0 -BLINKER_LEFT = 1 -BLINKER_RIGHT = 2 -BLINKER_BOTH = 3 - -DESIRES = { - LaneChangeDirection.none: { - LaneChangeState.off: log.Desire.none, - LaneChangeState.preLaneChange: log.Desire.none, - LaneChangeState.laneChangeStarting: log.Desire.none, - LaneChangeState.laneChangeFinishing: log.Desire.none, - }, - LaneChangeDirection.left: { - LaneChangeState.off: log.Desire.none, - LaneChangeState.preLaneChange: log.Desire.none, - LaneChangeState.laneChangeStarting: log.Desire.laneChangeLeft, - LaneChangeState.laneChangeFinishing: log.Desire.laneChangeLeft, - }, - LaneChangeDirection.right: { - LaneChangeState.off: log.Desire.none, - LaneChangeState.preLaneChange: log.Desire.none, - LaneChangeState.laneChangeStarting: log.Desire.laneChangeRight, - LaneChangeState.laneChangeFinishing: log.Desire.laneChangeRight, - }, -} - -TURN_DESIRES = { - TurnDirection.none: log.Desire.none, - TurnDirection.turnLeft: log.Desire.turnLeft, - TurnDirection.turnRight: log.Desire.turnRight, -} - - -def calculate_lane_width_frog(lane, current_lane, road_edge): - lane_x, lane_y = np.array(lane.x), np.array(lane.y) - edge_x, edge_y = np.array(road_edge.x), np.array(road_edge.y) - current_x, current_y = np.array(current_lane.x), np.array(current_lane.y) - - lane_y_interp = np.interp(current_x, lane_x[lane_x.argsort()], lane_y[lane_x.argsort()]) - road_edge_y_interp = np.interp(current_x, edge_x[edge_x.argsort()], edge_y[edge_x.argsort()]) - - distance_to_lane = np.mean(np.abs(current_y - lane_y_interp)) - distance_to_road_edge = np.mean(np.abs(current_y - road_edge_y_interp)) - - return min(distance_to_lane, distance_to_road_edge), distance_to_road_edge - - -def calculate_lane_width(lane, lane_prob, current_lane, road_edge): - # t ≈ 1초 앞 기준으로 차선/도로 가장자리까지의 거리 계산 - t = 1.0 - current_lane_y = np.interp(t, ModelConstants.T_IDXS, current_lane.y) - lane_y = np.interp(t, ModelConstants.T_IDXS, lane.y) - distance_to_lane = abs(current_lane_y - lane_y) - - road_edge_y = np.interp(t, ModelConstants.T_IDXS, road_edge.y) - distance_to_road_edge = abs(current_lane_y - road_edge_y) - distance_to_road_edge_far = abs(current_lane_y - np.interp(2.0, ModelConstants.T_IDXS, road_edge.y)) - - lane_valid = lane_prob > 0.5 - return min(distance_to_lane, distance_to_road_edge), distance_to_road_edge, distance_to_road_edge_far, lane_valid - - -class ExistCounter: - """ - 존재 여부가 노이즈처럼 튀는 신호를 히스테리시스로 안정화해 주는 카운터. - - true가 일정 시간 이상 지속되면 counter를 양수로 증가 - - false가 일정 시간 이상 지속되면 counter를 음수로 감소 - """ - def __init__(self): - self.counter = 0 - self.true_count = 0 - self.false_count = 0 - self.threshold = int(0.2 / DT_MDL) # 약 0.2초 이상 지속 시 유효로 판단 - - def update(self, exist_flag: bool): - if exist_flag: - self.true_count += 1 - self.false_count = 0 - if self.true_count >= self.threshold: - self.counter = max(self.counter + 1, 1) - else: - self.false_count += 1 - self.true_count = 0 - if self.false_count >= self.threshold: - self.counter = min(self.counter - 1, -1) - return self.true_count + +from openpilot.selfdrive.controls.lib.desire_lib.constants import ( + LaneChangeState, LaneChangeDirection, TurnDirection, + LANE_CHANGE_SPEED_MIN, LANE_CHANGE_TIME_MAX, + BLINKER_NONE, BLINKER_LEFT, BLINKER_RIGHT, + DESIRES, TURN_DESIRES +) +from openpilot.selfdrive.controls.lib.desire_lib.side_state import SideState +from openpilot.selfdrive.controls.lib.desire_lib.maneuver_classifier import classify_maneuver_type class DesireHelper: @@ -106,7 +19,7 @@ def __init__(self): self.params = Params() self.frame = 0 - # Lane change / turn 상태 + # FSM core self.lane_change_state = LaneChangeState.off self.lane_change_direction = LaneChangeDirection.none self.lane_change_timer = 0.0 @@ -114,82 +27,51 @@ def __init__(self): self.lane_change_delay = 0.0 self.maneuver_type = "none" # "none" / "turn" / "lane_change" - # Desire / turn 관련 self.desire = log.Desire.none self.turn_direction = TurnDirection.none self.enable_turn_desires = True self.turn_desire_state = False - self.desire_disable_count = 0 # turn 후 일정 시간 동안 차선변경 금지 - self.turn_disable_count = 0 # steeringAngle 매우 클 때 turn 금지 - - # Lane / road edge 상태 - self.lane_width_left = 0.0 - self.lane_width_right = 0.0 - self.lane_width_left_diff = 0.0 - self.lane_width_right_diff = 0.0 - self.distance_to_road_edge_left = 0.0 - self.distance_to_road_edge_right = 0.0 - self.distance_to_road_edge_left_far = 0.0 - self.distance_to_road_edge_right_far = 0.0 - - self.lane_exist_left_count = ExistCounter() - self.lane_exist_right_count = ExistCounter() - self.lane_width_left_count = ExistCounter() - self.lane_width_right_count = ExistCounter() - self.road_edge_left_count = ExistCounter() - self.road_edge_right_count = ExistCounter() - - self.available_left_lane = False - self.available_right_lane = False - self.available_left_edge = False - self.available_right_edge = False - - self.lane_width_left_queue = deque(maxlen=int(1.0 / DT_MDL)) - self.lane_width_right_queue = deque(maxlen=int(1.0 / DT_MDL)) - - self.lane_available_last = False - self.edge_available_last = False - self.lane_available_trigger = False - self.lane_appeared = False - self.lane_line_info = 0 - - # Blinkers / ATC + self.desire_disable_count = 0 + self.turn_disable_count = 0 + + # per-side states + self.left = SideState("left") + self.right = SideState("right") + + # blinker/ATC state (원본 변수들 유지) self.blinker_ignore = False self.driver_blinker_state = BLINKER_NONE self.carrot_blinker_state = BLINKER_NONE self.carrot_lane_change_count = 0 self.carrot_cmd_index_last = 0 self.atc_type = "" - self.atc_active = 0 # 0: 없음, 1: ATC 동작 중, 2: 운전자와 ATC 상충 + self.atc_active = 0 # 0: 없음, 1: ATC 동작, 2: 충돌 - # Auto lane change 관련 + # auto lane change self.auto_lane_change_enable = False self.next_lane_change = False - self.blindspot_detected_counter = 0 - # Keep pulse + # keep pulse self.keep_pulse_timer = 0.0 - # 파라미터 + # params self.laneChangeNeedTorque = 0 self.laneChangeBsd = 0 self.laneChangeDelay = 0.0 self.modelTurnSpeedFactor = 0.0 - self.model_turn_speed = 0.0 + self.model_turn_speed = 200.0 - # 기타 + # misc self.prev_desire_enabled = False self.desireLog = "" - self.object_detected_count = 0 - # ★ 현재 차선 확률 (Ego 기준 좌/우) - self.cur_left_prob = 1.0 # laneLineProbs[1] - self.cur_right_prob = 1.0 # laneLineProbs[2] - self.current_lane_missing = False + # externally readable flags + self.lane_change_available_left = False + self.lane_change_available_right = False + # ───────────────────────────────────────────── - # Config / Model 관련 + # params/model # ───────────────────────────────────────────── - def _update_params_periodic(self): if self.frame % 100 == 0: self.laneChangeNeedTorque = self.params.get_int("LaneChangeNeedTorque") @@ -206,97 +88,36 @@ def _make_model_turn_speed(self, modeldata): else: self.model_turn_speed = 200.0 - # ───────────────────────────────────────────── - # Lane / Edge 상태 계산 - # ───────────────────────────────────────────── - - def _check_lane_state(self, modeldata): - # 왼쪽: laneLines[0] vs 현재차선 laneLines[1], roadEdges[0] - lane_width_left, self.distance_to_road_edge_left, self.distance_to_road_edge_left_far, lane_prob_left = \ - calculate_lane_width(modeldata.laneLines[0], modeldata.laneLineProbs[0], - modeldata.laneLines[1], modeldata.roadEdges[0]) - - # 오른쪽: laneLines[3] vs 현재차선 laneLines[2], roadEdges[1] - lane_width_right, self.distance_to_road_edge_right, self.distance_to_road_edge_right_far, lane_prob_right = \ - calculate_lane_width(modeldata.laneLines[3], modeldata.laneLineProbs[3], - modeldata.laneLines[2], modeldata.roadEdges[1]) - - # 차선 존재 카운터 업데이트 - self.lane_exist_left_count.update(lane_prob_left) - self.lane_exist_right_count.update(lane_prob_right) - - # 1초 이동 평균 (노이즈 줄이기) - self.lane_width_left_queue.append(lane_width_left) - self.lane_width_right_queue.append(lane_width_right) - - self.lane_width_left = float(np.mean(self.lane_width_left_queue)) - self.lane_width_right = float(np.mean(self.lane_width_right_queue)) - - self.lane_width_left_diff = self.lane_width_left_queue[-1] - self.lane_width_left_queue[0] - self.lane_width_right_diff = self.lane_width_right_queue[-1] - self.lane_width_right_queue[0] - - # 유효 차선/엣지 판단 - min_lane_width = 2.5 - self.lane_width_left_count.update(self.lane_width_left > min_lane_width) - self.lane_width_right_count.update(self.lane_width_right > min_lane_width) - self.road_edge_left_count.update(self.distance_to_road_edge_left > min_lane_width) - self.road_edge_right_count.update(self.distance_to_road_edge_right > min_lane_width) - - available_count = int(0.2 / DT_MDL) - self.available_left_lane = self.lane_width_left_count.counter > available_count - self.available_right_lane = self.lane_width_right_count.counter > available_count - self.available_left_edge = self.road_edge_left_count.counter > available_count and self.distance_to_road_edge_left_far > min_lane_width - self.available_right_edge = self.road_edge_right_count.counter > available_count and self.distance_to_road_edge_right_far > min_lane_width - - self.cur_left_prob = modeldata.laneLineProbs[1] - self.cur_right_prob = modeldata.laneLineProbs[2] - - # ───────────────────────────────────────────── - # 모델 내 desire 상태 (turn 예측 등) - # ───────────────────────────────────────────── - def _check_desire_state(self, modeldata, carstate, maneuver_type): desire_state = modeldata.meta.desireState orientation_rate = abs(modeldata.orientationRate.z[5]) orientation_rate_future = abs(modeldata.orientationRate.z[15]) - # turnLeft + turnRight 확률 - self.turn_desire_state = (desire_state[1] + desire_state[2]) > 0.1 - #if self.turn_desire_state: - # self.desire_disable_count = int(2.0 / DT_MDL) - #else: - # self.desire_disable_count = max(0, self.desire_disable_count - 1) + self.turn_desire_state = (desire_state[1] + desire_state[2]) > 0.1 - # steeringAngle 너무 크면 turn 자체를 일정 시간 막기 if maneuver_type == "turn" and abs(carstate.steeringAngleDeg) > 80 and orientation_rate_future < orientation_rate: self.turn_disable_count = int(10.0 / DT_MDL) else: self.turn_disable_count = max(0, self.turn_disable_count - 1) # ───────────────────────────────────────────── - # Blinkers / ATC 상태 업데이트 + # blinkers/ATC (원본 로직 유지, side 계산은 별개) # ───────────────────────────────────────────── - def _update_driver_blinker(self, carstate): - driver_blinker_state = carstate.leftBlinker * 1 + carstate.rightBlinker * 2 - driver_blinker_changed = driver_blinker_state != self.driver_blinker_state - self.driver_blinker_state = driver_blinker_state + st = carstate.leftBlinker * 1 + carstate.rightBlinker * 2 + changed = st != self.driver_blinker_state + self.driver_blinker_state = st - driver_desire_enabled = driver_blinker_state in [BLINKER_LEFT, BLINKER_RIGHT] + enabled = st in (BLINKER_LEFT, BLINKER_RIGHT) if self.laneChangeNeedTorque < 0: - # 운전자 깜빡이를 무시하고 차선변경 안 하는 설정 - driver_desire_enabled = False - - return driver_blinker_state, driver_blinker_changed, driver_desire_enabled + enabled = False + return st, changed, enabled - def _update_atc_blinker(self, carrotMan, v_ego, driver_blinker_state): - """ - ATC에서 온 turn/lanechange 명령 기반 깜빡이 상태 갱신. - """ + def _update_atc_blinker(self, carrotMan, driver_blinker_state): atc_type = carrotMan.atcType atc_blinker_state = BLINKER_NONE - # ATC 기반 자동 차선변경 유지 시간 + # 유지 카운트는 DesireHelper에서 관리 if self.carrot_lane_change_count > 0: atc_blinker_state = self.carrot_blinker_state elif carrotMan.carrotCmdIndex != self.carrot_cmd_index_last and carrotMan.carrotCmd == "LANECHANGE": @@ -304,36 +125,33 @@ def _update_atc_blinker(self, carrotMan, v_ego, driver_blinker_state): self.carrot_lane_change_count = int(0.2 / DT_MDL) self.carrot_blinker_state = BLINKER_LEFT if carrotMan.carrotArg == "LEFT" else BLINKER_RIGHT atc_blinker_state = self.carrot_blinker_state - elif atc_type in ["turn left", "turn right"]: - # 네비 turn 안내: 속도 조건을 턴 쪽으로 강제 + elif atc_type in ("turn left", "turn right"): if self.atc_active != 2: atc_blinker_state = BLINKER_LEFT if atc_type == "turn left" else BLINKER_RIGHT self.atc_active = 1 self.blinker_ignore = False - elif atc_type in ["fork left", "fork right", "atc left", "atc right"]: - # 분기(lanechange에 가까움) + elif atc_type in ("fork left", "fork right", "atc left", "atc right"): if self.atc_active != 2: - atc_blinker_state = BLINKER_LEFT if atc_type in ["fork left", "atc left"] else BLINKER_RIGHT + atc_blinker_state = BLINKER_LEFT if atc_type in ("fork left", "atc left") else BLINKER_RIGHT self.atc_active = 1 else: self.atc_active = 0 - # 운전자 깜빡이와 ATC 깜빡이가 충돌할 경우 ATC 무효화 + # 충돌 시 ATC 무효 if driver_blinker_state != BLINKER_NONE and atc_blinker_state != BLINKER_NONE and driver_blinker_state != atc_blinker_state: atc_blinker_state = BLINKER_NONE self.atc_active = 2 - atc_desire_enabled = atc_blinker_state in [BLINKER_LEFT, BLINKER_RIGHT] + atc_desire_enabled = atc_blinker_state in (BLINKER_LEFT, BLINKER_RIGHT) - # blinker_ignore일 때는 깜빡이 신호를 잠시 무시 + # blinker_ignore if driver_blinker_state == BLINKER_NONE: self.blinker_ignore = False if self.blinker_ignore: - driver_blinker_state = BLINKER_NONE atc_blinker_state = BLINKER_NONE atc_desire_enabled = False - # ATC 타입이 바뀌었으면 이번 프레임은 무시 (안정화 목적) + # 타입 변경 1프레임 무시 if self.atc_type != atc_type: atc_desire_enabled = False self.atc_type = atc_type @@ -341,186 +159,114 @@ def _update_atc_blinker(self, carrotMan, v_ego, driver_blinker_state): return atc_blinker_state, atc_desire_enabled # ───────────────────────────────────────────── - # Turn / LaneChange 모드 분류 + # per-side processing (핵심: 좌/우 모두 매 프레임 계산) # ───────────────────────────────────────────── + def _process_sides(self, carstate, modeldata, radarState): + # geometry (좌/우) + # left: outer laneLines[0], current laneLines[1], edge[0], cur_prob laneLineProbs[1] + self.left.update_lane_geometry( + modeldata.laneLines[0], modeldata.laneLineProbs[0], + modeldata.laneLines[1], + modeldata.roadEdges[0], + cur_prob=modeldata.laneLineProbs[1], + ) + # right: outer laneLines[3], current laneLines[2], edge[1], cur_prob laneLineProbs[2] + self.right.update_lane_geometry( + modeldata.laneLines[3], modeldata.laneLineProbs[3], + modeldata.laneLines[2], + modeldata.roadEdges[1], + cur_prob=modeldata.laneLineProbs[2], + ) + + # lane line info (HUD용 raw는 기존대로 leftLaneLine/rightLaneLine) + self.left.update_lane_line_info(carstate.leftLaneLine) + self.right.update_lane_line_info(carstate.rightLaneLine) - def _classify_maneuver_type(self, blinker_state, carstate, old_type): - """ - 깜빡이가 들어왔을 때 이번 조작이 turn인지 lane_change인지 분류. - - 너무 복잡하게 가지 않고, 현재 속도/감속/차선상태/모델 turn 상태 기준으로 점수화. - """ - if blinker_state == BLINKER_NONE: - return "none" - - v_kph = carstate.vEgo * CV.MS_TO_KPH - accel = carstate.aEgo - - # 깜빡이 방향에 따라 참조할 lane/edge 선택 - if blinker_state == BLINKER_LEFT: - lane_exist_counter = self.lane_exist_left_count.counter - lane_available = self.available_left_lane - edge_available = self.available_left_edge - lane_prob_side = self.cur_left_prob - edge_dist = self.distance_to_road_edge_left_far - else: - lane_exist_counter = self.lane_exist_right_count.counter - lane_available = self.available_right_lane - edge_available = self.available_right_edge - lane_prob_side = self.cur_right_prob - edge_dist = self.distance_to_road_edge_right_far - - score_turn = 0 - - if v_kph < 30.0: - score_turn += 1 - elif v_kph < 40.0 and accel < -1.0: - score_turn += 1 - - # 차로가 없고, 로드에지도 여유없고.. - if v_kph < 40.0 and not lane_available and not edge_available: - score_turn += 1 - - # 차선이 잘 안 보이거나(교차로/삼거리 등) - if v_kph < 40.0 and lane_exist_counter < int(0.5 / DT_MDL): - score_turn += 1 - - # steeringAngle이 크면 턴에 가깝다고 본다 - #if abs(carstate.steeringAngleDeg) > 45.0: - # score_turn += 1 - - # 모델이 이미 turn을 예측 중이면 가중치 - if self.turn_desire_state: - score_turn += 1 - - # ATC가 turn 안내 중이면 가중치 - if self.atc_type in ["turn left", "turn right"]: - score_turn += 2 - elif self.atc_type in ["fork left", "fork right", "atc left", "atc right"]: - score_turn -= 2 # fork/atc는 lanechange 쪽에 더 가깝게 - - # ★ road edge가 충분히 멀면(교차로/넓은 공간으로 판단) 턴 쪽으로 가산점 - edge_far = edge_dist > 4.0 # 튜닝 포인트 (4~6m 정도가 무난) - #if edge_far: - # score_turn += 1 - - current_lane_missing = lane_prob_side < 0.3 - self.current_lane_missing = current_lane_missing - # 튜닝 포인트: score_turn 임계값 - if score_turn >= 2: - #if current_lane_missing and edge_far: - if edge_far: - return "turn" - else: - return old_type - else: - return "lane_change" + # BSD 설정 + ignore_bsd = (self.laneChangeBsd < 0) + + # obstacles + v_ego = carstate.vEgo + self.left.update_obstacles(v_ego, radarState.leadLeft, carstate.leftBlindspot, ignore_bsd, bsd_hold_sec=2.0) + self.right.update_obstacles(v_ego, radarState.leadRight, carstate.rightBlindspot, ignore_bsd, bsd_hold_sec=2.0) + + # compute available (include BSD+object) + self.left.compute_lane_change_available(lane_line_info_lt_20=(self.left.lane_line_info_raw < 20), ignore_bsd=ignore_bsd) + self.right.compute_lane_change_available(lane_line_info_lt_20=(self.right.lane_line_info_raw < 20), ignore_bsd=ignore_bsd) + + self.left.update_triggers() + self.right.update_triggers() + + # externally readable + self.lane_change_available_left = self.left.lane_change_available + self.lane_change_available_right = self.right.lane_change_available + + def _get_selected_side(self, blinker_state: int) -> SideState: + return self.left if blinker_state == BLINKER_LEFT else self.right # ───────────────────────────────────────────── - # 메인 업데이트 루틴 + # main update # ───────────────────────────────────────────── - def update(self, carstate, modeldata, lateral_active, lane_change_prob, carrotMan, radarState): self.frame += 1 self._update_params_periodic() self._make_model_turn_speed(modeldata) - # 카운터 감소 + # counts self.carrot_lane_change_count = max(0, self.carrot_lane_change_count - 1) self.lane_change_delay = max(0.0, self.lane_change_delay - DT_MDL) - self.blindspot_detected_counter = max(0, self.blindspot_detected_counter - 1) v_ego = carstate.vEgo below_lane_change_speed = v_ego < LANE_CHANGE_SPEED_MIN - # Lane / desire 상태 갱신 - self._check_lane_state(modeldata) + # per-side compute (좌/우 모두) + self._process_sides(carstate, modeldata, radarState) + + # desire state from model self._check_desire_state(modeldata, carstate, self.maneuver_type) - # 운전자 깜빡이 - driver_blinker_state, driver_blinker_changed, driver_desire_enabled = self._update_driver_blinker(carstate) + # blinkers + driver_st, driver_changed, driver_enabled = self._update_driver_blinker(carstate) + atc_st, atc_enabled = self._update_atc_blinker(carrotMan, driver_st) - # BSD 설정 - ignore_bsd = (self.laneChangeBsd < 0) - block_lanechange_bsd = (self.laneChangeBsd == 1) - - # ATC 깜빡이 - atc_blinker_state, atc_desire_enabled = self._update_atc_blinker(carrotMan, v_ego, driver_blinker_state) - - # 최종 깜빡이/Desire enabled 판단 - desire_enabled = driver_desire_enabled or atc_desire_enabled - blinker_state = driver_blinker_state if driver_desire_enabled else atc_blinker_state - - # lane_line_info (HUD용 등) - lane_line_info = carstate.leftLaneLine if blinker_state == BLINKER_LEFT else carstate.rightLaneLine - - # BSD / 주변 차량 감지 - if desire_enabled: - lane_exist_counter = self.lane_exist_left_count.counter if blinker_state == BLINKER_LEFT else self.lane_exist_right_count.counter - lane_available = self.available_left_lane if blinker_state == BLINKER_LEFT else self.available_right_lane - edge_available = self.available_left_edge if blinker_state == BLINKER_LEFT else self.available_right_edge - self.lane_appeared = self.lane_appeared or lane_exist_counter == int(0.2 / DT_MDL) - - radar = radarState.leadLeft if blinker_state == BLINKER_LEFT else radarState.leadRight - side_object_dist = radar.dRel + radar.vLead * 4.0 if radar.status else 255 - object_detected = side_object_dist < v_ego * 3.0 - if object_detected: - self.object_detected_count = max(1, self.object_detected_count + 1) - else: - self.object_detected_count = min(-1, self.object_detected_count - 1) + desire_enabled = driver_enabled or atc_enabled + blinker_state = driver_st if driver_enabled else atc_st - lane_line_info_edge_detect = (lane_line_info % 10 in [0, 5] and self.lane_line_info not in [0, 5]) - self.lane_line_info = lane_line_info % 10 - else: - lane_exist_counter = 0 - lane_available = True - edge_available = True - self.lane_appeared = False - self.lane_available_trigger = False - self.object_detected_count = 0 - lane_line_info_edge_detect = False - self.lane_line_info = lane_line_info % 10 - - # 차선/엣지 기반 lane change 가능 여부 - lane_change_available = (lane_available or edge_available) and lane_line_info < 20 # 20 미만이면 흰색 라인 - - # lane_available 변화 & 폭 변화로 lane_available_trigger 계산 - self.lane_available_trigger = False - if blinker_state == BLINKER_LEFT: - lane_width_diff = self.lane_width_left_diff - distance_to_road_edge = self.distance_to_road_edge_left - lane_width_side = self.lane_width_left - else: - lane_width_diff = self.lane_width_right_diff - distance_to_road_edge = self.distance_to_road_edge_right - lane_width_side = self.lane_width_right + # 선택된 side (FSM은 이 side만 참고) + side = self._get_selected_side(blinker_state) if blinker_state in (BLINKER_LEFT, BLINKER_RIGHT) else None - if lane_width_diff > 0.8 and (lane_width_side < distance_to_road_edge): - self.lane_available_trigger = True - - edge_availabled = (not self.edge_available_last and edge_available) - side_object_detected = self.object_detected_count > -0.3 / DT_MDL - self.lane_appeared = self.lane_appeared and distance_to_road_edge < 4.0 - - # Auto lane change 트리거 - if self.carrot_lane_change_count > 0: - auto_lane_change_blocked = False - auto_lane_change_trigger = lane_change_available + # auto lane change trigger (기존 로직 유지하되 side 기반) + auto_lane_change_trigger = False + if desire_enabled and side is not None: + # carrot_lane_change_count>0이면 강제 허용 + if self.carrot_lane_change_count > 0: + auto_lane_change_trigger = side.lane_change_available + else: + # 기존 조건: edge_available + (trigger or appeared) + not side_object_detected + auto_lane_change_trigger = ( + self.auto_lane_change_enable and + side.edge_available and + (side.lane_available_trigger or side.lane_appeared) and + (not side.side_object_detected) and + (side.bsd_hold_counter == 0) + ) + self.desireLog = ( + f"{side.name}:ALC={self.auto_lane_change_enable}, " + f"L={side.lane_available},E={side.edge_available}, " + f"T={side.lane_available_trigger},A={side.lane_appeared}, " + f"OBJ={side.side_object_detected},BSD={side.bsd_hold_counter>0}" + ) else: - auto_lane_change_blocked = ((atc_blinker_state == BLINKER_LEFT) and (driver_blinker_state != BLINKER_LEFT)) - self.auto_lane_change_enable = self.auto_lane_change_enable and not auto_lane_change_blocked - auto_lane_change_trigger = self.auto_lane_change_enable and edge_available and (self.lane_available_trigger or self.lane_appeared) and not side_object_detected - self.desireLog = f"L:{self.auto_lane_change_enable},{auto_lane_change_blocked},E:{lane_available},{edge_available},A:{self.lane_available_trigger},{self.lane_appeared},{lane_width_diff:.1f},{lane_width_side:.1f},{distance_to_road_edge:.1f}={auto_lane_change_trigger}" + self.auto_lane_change_enable = False + self.next_lane_change = False - # 메인 상태머신 - - # 0) lateral 끊기거나 너무 오래 지속되면 리셋 + # ───────────────────────── FSM ───────────────────────── if not lateral_active or self.lane_change_timer > LANE_CHANGE_TIME_MAX: self.lane_change_state = LaneChangeState.off self.lane_change_direction = LaneChangeDirection.none self.turn_direction = TurnDirection.none self.maneuver_type = "none" - # 1) turn 후 일정시간 동안은 아무 것도 하지 않음 elif self.desire_disable_count > 0: self.lane_change_state = LaneChangeState.off self.lane_change_direction = LaneChangeDirection.none @@ -528,95 +274,108 @@ def update(self, carstate, modeldata, lateral_active, lane_change_prob, carrotMa self.maneuver_type = "none" else: - # 깜빡이 켜져 있을 때, 이번 조작이 turn인지 lane_change인지 먼저 분류 - if desire_enabled: - new_type = self._classify_maneuver_type(blinker_state, carstate, self.maneuver_type) + # classify maneuver type using selected side + if desire_enabled and side is not None: + new_type = classify_maneuver_type( + blinker_state=blinker_state, + carstate=carstate, + side=side, + turn_desire_state=self.turn_desire_state, + atc_type=self.atc_type, + old_type=self.maneuver_type, + ) else: new_type = "none" - # ★ 1) 원래 lane_change였는데 새로 보니 turn 조건 + 차선 없음이면 → 강제 전환 허용 - if self.maneuver_type == "lane_change" and new_type == "turn" and self.lane_change_state not in [LaneChangeState.preLaneChange, LaneChangeState.laneChangeStarting]: - # 차선변경 도중에도 조건 만족 시 턴으로 스위칭 + # switching rules + if self.maneuver_type == "lane_change" and new_type == "turn" and self.lane_change_state not in ( + LaneChangeState.preLaneChange, LaneChangeState.laneChangeStarting + ): self.maneuver_type = "turn" - self.lane_change_state = LaneChangeState.off # FSM 리셋 후 turn 루트로 - # ★ 2) 그 외에는 off/pre 상태에서만 모드 변경 + self.lane_change_state = LaneChangeState.off elif self.lane_change_state in (LaneChangeState.off, LaneChangeState.preLaneChange): self.maneuver_type = new_type - # ─ TURN 모드 처리 ─ - if desire_enabled and self.maneuver_type == "turn" and self.enable_turn_desires: # and not carstate.standstill: + # ─ TURN mode ─ + if desire_enabled and self.maneuver_type == "turn" and self.enable_turn_desires: self.lane_change_state = LaneChangeState.off if self.turn_disable_count > 0: self.turn_direction = TurnDirection.none self.lane_change_direction = LaneChangeDirection.none else: self.turn_direction = TurnDirection.turnLeft if blinker_state == BLINKER_LEFT else TurnDirection.turnRight - # 호환성을 위해 lane_change_direction도 turn과 동일하게 세팅 self.lane_change_direction = self.turn_direction - # ─ Lane Change FSM 처리 ─ + # ─ Lane change FSM ─ else: self.turn_direction = TurnDirection.none - # LaneChangeState.off if self.lane_change_state == LaneChangeState.off: - if desire_enabled and not self.prev_desire_enabled and not below_lane_change_speed: + if desire_enabled and not self.prev_desire_enabled and not below_lane_change_speed and side is not None: self.lane_change_state = LaneChangeState.preLaneChange self.lane_change_ll_prob = 1.0 self.lane_change_delay = self.laneChangeDelay # 맨 끝 차선이 아니면, ATC 자동 차선변경 비활성 - lane_exist_counter_side = self.lane_exist_left_count.counter if blinker_state == BLINKER_LEFT else self.lane_exist_right_count.counter - self.auto_lane_change_enable = False if lane_exist_counter_side > 0 or lane_change_available else True + # (원본 유지: 차선 존재하거나 geom 가능하면 auto off, 아니면 on) + lane_exist_counter_side = side.lane_exist_count.counter + lane_change_available_geom = side.lane_change_available_geom + self.auto_lane_change_enable = False if (lane_exist_counter_side > 0 or lane_change_available_geom) else True self.next_lane_change = False - # LaneChangeState.preLaneChange elif self.lane_change_state == LaneChangeState.preLaneChange: - self.lane_change_direction = LaneChangeDirection.left if blinker_state == BLINKER_LEFT else LaneChangeDirection.right - dir_map = { - LaneChangeDirection.left: (carstate.steeringTorque > 0, carstate.leftBlindspot), - LaneChangeDirection.right: (carstate.steeringTorque < 0, carstate.rightBlindspot), - } - torque_cond, blindspot_cond = dir_map.get(self.lane_change_direction, (False, False)) - torque_applied = carstate.steeringPressed and torque_cond - blindspot_detected = blindspot_cond - - # 차선이 일정시간 이상 안보이면 자동차선변경 허용 - lane_exist_counter_side = self.lane_exist_left_count.counter if blinker_state == BLINKER_LEFT else self.lane_exist_right_count.counter - if not lane_available or lane_exist_counter_side < int(2.0 / DT_MDL): - self.auto_lane_change_enable = True - - # BSD - if blindspot_detected and not ignore_bsd: - self.blindspot_detected_counter = int(1.5 / DT_MDL) - - if not desire_enabled or below_lane_change_speed: + if side is None: self.lane_change_state = LaneChangeState.off self.lane_change_direction = LaneChangeDirection.none else: - # 차선변경 시작 조건 - if (lane_change_available and self.lane_change_delay == 0) or lane_line_info_edge_detect: - if self.blindspot_detected_counter > 0 and not ignore_bsd: - if torque_applied and not block_lanechange_bsd: - self.lane_change_state = LaneChangeState.laneChangeStarting - elif self.laneChangeNeedTorque > 0 or self.next_lane_change: - if torque_applied: - self.lane_change_state = LaneChangeState.laneChangeStarting - elif driver_desire_enabled: - self.lane_change_state = LaneChangeState.laneChangeStarting - elif torque_applied or auto_lane_change_trigger or lane_line_info_edge_detect: - self.lane_change_state = LaneChangeState.laneChangeStarting - - # LaneChangeState.laneChangeStarting + self.lane_change_direction = LaneChangeDirection.left if blinker_state == BLINKER_LEFT else LaneChangeDirection.right + + # torque direction cond + torque_cond = (carstate.steeringTorque > 0) if blinker_state == BLINKER_LEFT else (carstate.steeringTorque < 0) + torque_applied = carstate.steeringPressed and torque_cond + + # BSD config + ignore_bsd = (self.laneChangeBsd < 0) + block_lanechange_bsd = (self.laneChangeBsd == 1) + bsd_active = (side.bsd_hold_counter > 0) and (not ignore_bsd) + + # 차선이 일정시간 이상 안보이면 auto 허용(원본 유지) + if (not side.lane_available) or (side.lane_exist_count.counter < int(2.0 / DT_MDL)): + self.auto_lane_change_enable = True + + if not desire_enabled or below_lane_change_speed: + self.lane_change_state = LaneChangeState.off + self.lane_change_direction = LaneChangeDirection.none + else: + # 차선변경 시작 조건: + # - side.lane_change_available는 BSD+object 포함(요구사항) + # - 하지만 BSD 중에도 torque override 허용해야 하므로, BSD 분기를 별도로 둠(원본 동작 유지) + start_gate = (side.lane_change_available_geom and self.lane_change_delay == 0) or side.lane_line_info_edge_detect + + if start_gate: + if bsd_active: + if torque_applied and (not block_lanechange_bsd): + self.lane_change_state = LaneChangeState.laneChangeStarting + elif self.laneChangeNeedTorque > 0 or self.next_lane_change: + if torque_applied: + self.lane_change_state = LaneChangeState.laneChangeStarting + elif driver_enabled: + # driver blinker면 바로 시작(원본 유지) + # 단, object/bzd 막힘은 side.lane_change_available에서 걸림 + if side.lane_change_available: + self.lane_change_state = LaneChangeState.laneChangeStarting + else: + if torque_applied or auto_lane_change_trigger or side.lane_line_info_edge_detect: + # 여기서는 시작 직전 안전성 체크 + if side.lane_change_available: + self.lane_change_state = LaneChangeState.laneChangeStarting + elif self.lane_change_state == LaneChangeState.laneChangeStarting: - # 원래 차선라인을 0.5초 동안 서서히 fade-out self.lane_change_ll_prob = max(self.lane_change_ll_prob - 2 * DT_MDL, 0.0) if lane_change_prob < 0.02 and self.lane_change_ll_prob < 0.01: self.lane_change_state = LaneChangeState.laneChangeFinishing - # LaneChangeState.laneChangeFinishing elif self.lane_change_state == LaneChangeState.laneChangeFinishing: - # 1초 동안 서서히 lane line 복귀 self.lane_change_ll_prob = min(self.lane_change_ll_prob + DT_MDL, 1.0) if self.lane_change_ll_prob > 0.99: self.lane_change_direction = LaneChangeDirection.none @@ -626,34 +385,36 @@ def update(self, carstate, modeldata, lateral_active, lane_change_prob, carrotMa else: self.lane_change_state = LaneChangeState.off - # lane_change_timer 관리 + # timer if self.lane_change_state in (LaneChangeState.off, LaneChangeState.preLaneChange): self.lane_change_timer = 0.0 else: self.lane_change_timer += DT_MDL - self.lane_available_last = lane_available - self.edge_available_last = edge_available + # commit last per-side + self.left.commit_last() + self.right.commit_last() self.prev_desire_enabled = desire_enabled - # 운전자가 반대 방향으로 강하게 조향하면 해당 차선변경/턴 취소 - steering_pressed_cancel = carstate.steeringPressed and \ - ((carstate.steeringTorque < 0 and blinker_state == BLINKER_LEFT) or - (carstate.steeringTorque > 0 and blinker_state == BLINKER_RIGHT)) + # 반대 방향 토크로 cancel (기존 유지) + steering_pressed_cancel = carstate.steeringPressed and ( + (carstate.steeringTorque < 0 and blinker_state == BLINKER_LEFT) or + (carstate.steeringTorque > 0 and blinker_state == BLINKER_RIGHT) + ) if steering_pressed_cancel and self.lane_change_state != LaneChangeState.off: self.lane_change_direction = LaneChangeDirection.none self.lane_change_state = LaneChangeState.off self.blinker_ignore = True - # 최종 desire 결정 + # final desire if self.turn_direction != TurnDirection.none: self.desire = TURN_DESIRES[self.turn_direction] self.lane_change_direction = self.turn_direction else: self.desire = DESIRES[self.lane_change_direction][self.lane_change_state] - # keep pulse (LaneChangeState.preLaneChange에서 유지) + # keep pulse if self.lane_change_state in (LaneChangeState.off, LaneChangeState.laneChangeStarting): self.keep_pulse_timer = 0.0 elif self.lane_change_state == LaneChangeState.preLaneChange: @@ -662,3 +423,5 @@ def update(self, carstate, modeldata, lateral_active, lane_change_prob, carrotMa self.keep_pulse_timer = 0.0 elif self.desire in (log.Desire.keepLeft, log.Desire.keepRight): self.desire = log.Desire.none + + return self.desire diff --git a/selfdrive/controls/lib/desire_lib/__init__.py b/selfdrive/controls/lib/desire_lib/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/selfdrive/controls/lib/desire_lib/blinker_manager.py b/selfdrive/controls/lib/desire_lib/blinker_manager.py new file mode 100644 index 0000000000..c8d2fe9bbf --- /dev/null +++ b/selfdrive/controls/lib/desire_lib/blinker_manager.py @@ -0,0 +1,106 @@ +from dataclasses import dataclass +from .constants import BLINKER_NONE, BLINKER_LEFT, BLINKER_RIGHT + +@dataclass +class BlinkerOutput: + driver_blinker_state: int + driver_blinker_changed: bool + driver_desire_enabled: bool + + atc_blinker_state: int + atc_desire_enabled: bool + + blinker_state: int + desire_enabled: bool + + +class BlinkerManager: + def __init__(self): + self.driver_blinker_state = BLINKER_NONE + self.carrot_blinker_state = BLINKER_NONE + self.carrot_lane_change_count = 0 + self.carrot_cmd_index_last = 0 + + self.atc_type = "" + self.atc_active = 0 # 0: ����, 1: ATC ����, 2: �浹 + self.blinker_ignore = False + + def tick(self): + self.carrot_lane_change_count = max(0, self.carrot_lane_change_count - 1) + + def update_driver(self, carstate, laneChangeNeedTorque: int): + st = carstate.leftBlinker * 1 + carstate.rightBlinker * 2 + changed = st != self.driver_blinker_state + self.driver_blinker_state = st + + enabled = st in (BLINKER_LEFT, BLINKER_RIGHT) + if laneChangeNeedTorque < 0: + enabled = False + return st, changed, enabled + + def update_atc(self, carrotMan, driver_blinker_state: int): + atc_type = carrotMan.atcType + atc_blinker_state = BLINKER_NONE + + if self.carrot_lane_change_count > 0: + atc_blinker_state = self.carrot_blinker_state + + elif carrotMan.carrotCmdIndex != self.carrot_cmd_index_last and carrotMan.carrotCmd == "LANECHANGE": + self.carrot_cmd_index_last = carrotMan.carrotCmdIndex + self.carrot_lane_change_count = int(0.2 / carrotMan.DT_MDL) if hasattr(carrotMan, "DT_MDL") else 0 + self.carrot_blinker_state = BLINKER_LEFT if carrotMan.carrotArg == "LEFT" else BLINKER_RIGHT + atc_blinker_state = self.carrot_blinker_state + + elif atc_type in ("turn left", "turn right"): + if self.atc_active != 2: + atc_blinker_state = BLINKER_LEFT if atc_type == "turn left" else BLINKER_RIGHT + self.atc_active = 1 + self.blinker_ignore = False + + elif atc_type in ("fork left", "fork right", "atc left", "atc right"): + if self.atc_active != 2: + atc_blinker_state = BLINKER_LEFT if atc_type in ("fork left", "atc left") else BLINKER_RIGHT + self.atc_active = 1 + + else: + self.atc_active = 0 + + # �����ڿ� �浹�ϸ� ATC ��ȿ + if driver_blinker_state != BLINKER_NONE and atc_blinker_state != BLINKER_NONE and driver_blinker_state != atc_blinker_state: + atc_blinker_state = BLINKER_NONE + self.atc_active = 2 + + atc_desire_enabled = atc_blinker_state in (BLINKER_LEFT, BLINKER_RIGHT) + + # ignore ó�� + if driver_blinker_state == BLINKER_NONE: + self.blinker_ignore = False + if self.blinker_ignore: + atc_blinker_state = BLINKER_NONE + atc_desire_enabled = False + + # Ÿ�� �ٲ�� 1������ ����(����ȭ) + if self.atc_type != atc_type: + atc_desire_enabled = False + self.atc_type = atc_type + + return atc_blinker_state, atc_desire_enabled, atc_type + + def run(self, carstate, carrotMan, laneChangeNeedTorque: int): + self.tick() + + driver_st, driver_changed, driver_enabled = self.update_driver(carstate, laneChangeNeedTorque) + atc_st, atc_enabled, _ = self.update_atc(carrotMan, driver_st) + + desire_enabled = driver_enabled or atc_enabled + blinker_state = driver_st if driver_enabled else atc_st + + return BlinkerOutput( + driver_blinker_state=driver_st, + driver_blinker_changed=driver_changed, + driver_desire_enabled=driver_enabled, + atc_blinker_state=atc_st, + atc_desire_enabled=atc_enabled, + blinker_state=blinker_state, + desire_enabled=desire_enabled, + ) diff --git a/selfdrive/controls/lib/desire_lib/constants.py b/selfdrive/controls/lib/desire_lib/constants.py new file mode 100644 index 0000000000..5be90c7347 --- /dev/null +++ b/selfdrive/controls/lib/desire_lib/constants.py @@ -0,0 +1,41 @@ +from cereal import log +from openpilot.common.constants import CV + +LaneChangeState = log.LaneChangeState +LaneChangeDirection = log.LaneChangeDirection +TurnDirection = log.Desire + +LANE_CHANGE_SPEED_MIN = 30 * CV.KPH_TO_MS +LANE_CHANGE_TIME_MAX = 10.0 + +BLINKER_NONE = 0 +BLINKER_LEFT = 1 +BLINKER_RIGHT = 2 +BLINKER_BOTH = 3 + +DESIRES = { + LaneChangeDirection.none: { + LaneChangeState.off: log.Desire.none, + LaneChangeState.preLaneChange: log.Desire.none, + LaneChangeState.laneChangeStarting: log.Desire.none, + LaneChangeState.laneChangeFinishing: log.Desire.none, + }, + LaneChangeDirection.left: { + LaneChangeState.off: log.Desire.none, + LaneChangeState.preLaneChange: log.Desire.none, + LaneChangeState.laneChangeStarting: log.Desire.laneChangeLeft, + LaneChangeState.laneChangeFinishing: log.Desire.laneChangeLeft, + }, + LaneChangeDirection.right: { + LaneChangeState.off: log.Desire.none, + LaneChangeState.preLaneChange: log.Desire.none, + LaneChangeState.laneChangeStarting: log.Desire.laneChangeRight, + LaneChangeState.laneChangeFinishing: log.Desire.laneChangeRight, + }, +} + +TURN_DESIRES = { + TurnDirection.none: log.Desire.none, + TurnDirection.turnLeft: log.Desire.turnLeft, + TurnDirection.turnRight: log.Desire.turnRight, +} diff --git a/selfdrive/controls/lib/desire_lib/hysteresis.py b/selfdrive/controls/lib/desire_lib/hysteresis.py new file mode 100644 index 0000000000..b1eabc30ad --- /dev/null +++ b/selfdrive/controls/lib/desire_lib/hysteresis.py @@ -0,0 +1,21 @@ +from openpilot.common.realtime import DT_MDL + +class ExistCounter: + def __init__(self, sustain_sec: float = 0.2): + self.counter = 0 + self.true_count = 0 + self.false_count = 0 + self.threshold = int(sustain_sec / DT_MDL) + + def update(self, exist_flag: bool): + if exist_flag: + self.true_count += 1 + self.false_count = 0 + if self.true_count >= self.threshold: + self.counter = max(self.counter + 1, 1) + else: + self.false_count += 1 + self.true_count = 0 + if self.false_count >= self.threshold: + self.counter = min(self.counter - 1, -1) + return self.counter diff --git a/selfdrive/controls/lib/desire_lib/lane_math.py b/selfdrive/controls/lib/desire_lib/lane_math.py new file mode 100644 index 0000000000..0d047df9e6 --- /dev/null +++ b/selfdrive/controls/lib/desire_lib/lane_math.py @@ -0,0 +1,15 @@ +import numpy as np +from openpilot.selfdrive.modeld.constants import ModelConstants + +def calculate_lane_width(lane, lane_prob, current_lane, road_edge): + t = 1.0 + current_lane_y = np.interp(t, ModelConstants.T_IDXS, current_lane.y) + lane_y = np.interp(t, ModelConstants.T_IDXS, lane.y) + distance_to_lane = abs(current_lane_y - lane_y) + + road_edge_y = np.interp(t, ModelConstants.T_IDXS, road_edge.y) + distance_to_road_edge = abs(current_lane_y - road_edge_y) + distance_to_road_edge_far = abs(current_lane_y - np.interp(2.0, ModelConstants.T_IDXS, road_edge.y)) + + lane_valid = lane_prob > 0.5 + return min(distance_to_lane, distance_to_road_edge), distance_to_road_edge, distance_to_road_edge_far, lane_valid diff --git a/selfdrive/controls/lib/desire_lib/maneuver_classifier.py b/selfdrive/controls/lib/desire_lib/maneuver_classifier.py new file mode 100644 index 0000000000..0021af97d7 --- /dev/null +++ b/selfdrive/controls/lib/desire_lib/maneuver_classifier.py @@ -0,0 +1,45 @@ +from openpilot.common.constants import CV +from openpilot.common.realtime import DT_MDL +from .constants import BLINKER_LEFT, BLINKER_RIGHT + +def classify_maneuver_type(blinker_state: int, + carstate, + side, # SideState + turn_desire_state: bool, + atc_type: str, + old_type: str): + if blinker_state == 0: + return "none" + + v_kph = carstate.vEgo * CV.MS_TO_KPH + accel = carstate.aEgo + + score_turn = 0 + if v_kph < 30.0: + score_turn += 1 + elif v_kph < 40.0 and accel < -1.0: + score_turn += 1 + + # ���� ���� edge ������ ������ turn ���� + if v_kph < 40.0 and (not side.lane_available) and (not side.edge_available): + score_turn += 1 + + # ������ �� �� ���̸�(������ ��) + if v_kph < 40.0 and side.lane_exist_count.counter < int(0.5 / DT_MDL): + score_turn += 1 + + if turn_desire_state: + score_turn += 1 + + if atc_type in ("turn left", "turn right"): + score_turn += 2 + elif atc_type in ("fork left", "fork right", "atc left", "atc right"): + score_turn -= 2 + + edge_far = side.dist_to_edge_far > 4.0 + + if score_turn >= 2: + if edge_far: + return "turn" + return old_type + return "lane_change" diff --git a/selfdrive/controls/lib/desire_lib/side_state.py b/selfdrive/controls/lib/desire_lib/side_state.py new file mode 100644 index 0000000000..63a011e49a --- /dev/null +++ b/selfdrive/controls/lib/desire_lib/side_state.py @@ -0,0 +1,156 @@ +from dataclasses import dataclass, field +from collections import deque +from typing import Optional + +import numpy as np +from openpilot.common.realtime import DT_MDL +from openpilot.common.constants import CV + +from .lane_math import calculate_lane_width +from .hysteresis import ExistCounter + + +@dataclass +class SideState: + name: str # "left" / "right" + + # lane/edge distances + lane_width: float = 0.0 + lane_width_diff: float = 0.0 + dist_to_edge: float = 0.0 + dist_to_edge_far: float = 0.0 + + # current lane prob (ego lane line prob on that side) + cur_prob: float = 1.0 + current_lane_missing: bool = False + + # counters + lane_exist_count: ExistCounter = field(default_factory=lambda: ExistCounter(0.2)) + lane_width_count: ExistCounter = field(default_factory=lambda: ExistCounter(0.2)) + edge_count: ExistCounter = field(default_factory=lambda: ExistCounter(0.2)) + + # availability + lane_available: bool = False + edge_available: bool = False + + # smoothing + lane_width_queue: deque = field(default_factory=lambda: deque(maxlen=int(1.0 / DT_MDL))) + + # lane line info + lane_line_info_raw: int = 0 + lane_line_info_mod: int = 0 + last_lane_line_mod: int = 0 + lane_line_info_edge_detect: bool = False + + # transitions + lane_available_last: bool = False + edge_available_last: bool = False + lane_available_trigger: bool = False + lane_appeared: bool = False + + # obstacles + object_detected_count: int = 0 + side_object_detected: bool = False + + # BSD hold (after detection) + bsd_hold_counter: int = 0 + bsd_detected_now: bool = False + + # computed ��lane change available�� (includes BSD+object) + lane_change_available_geom: bool = False + lane_change_available: bool = False + lane_width_sum: float = 0.0 + + def update_lane_geometry(self, + lane_outer, lane_outer_prob, + lane_current, + road_edge, + cur_prob: float): + lane_w, dist_edge, dist_edge_far, lane_valid = calculate_lane_width( + lane_outer, lane_outer_prob, lane_current, road_edge + ) + + self.lane_exist_count.update(bool(lane_valid)) + + # running mean (O(1)) + if len(self.lane_width_queue) == self.lane_width_queue.maxlen: + self.lane_width_sum -= self.lane_width_queue.popleft() + self.lane_width_queue.append(lane_w) + self.lane_width_sum += lane_w + self.lane_width = self.lane_width_sum / len(self.lane_width_queue) + + self.lane_width_diff = (self.lane_width_queue[-1] - self.lane_width_queue[0]) if len(self.lane_width_queue) >= 2 else 0.0 + + self.dist_to_edge = float(dist_edge) + self.dist_to_edge_far = float(dist_edge_far) + + min_lane_width = 2.5 + self.lane_width_count.update(self.lane_width > min_lane_width) + self.edge_count.update(self.dist_to_edge > min_lane_width) + + available_count = int(0.2 / DT_MDL) + self.lane_available = self.lane_width_count.counter > available_count + self.edge_available = (self.edge_count.counter > available_count) and (self.dist_to_edge_far > min_lane_width) + + self.cur_prob = float(cur_prob) + self.current_lane_missing = self.cur_prob < 0.3 + + def update_lane_line_info(self, lane_line_info_raw: int): + self.lane_line_info_raw = int(lane_line_info_raw) + mod = self.lane_line_info_raw % 10 + # edge_detect: 0/5�� �ٲ�� ���� (������ ��/�찡 ���� self.lane_line_info ������ ���׼�) + self.lane_line_info_edge_detect = (mod in (0, 5)) and (self.last_lane_line_mod not in (0, 5)) + self.last_lane_line_mod = mod + self.lane_line_info_mod = mod + + def update_obstacles(self, + v_ego: float, + radar_obj, # radarState.leadLeft / leadRight + blindspot: bool, # carstate.leftBlindspot/rightBlindspot + ignore_bsd: bool, + bsd_hold_sec: float = 2.0): + # object_detected (radar ���) + if radar_obj is not None and radar_obj.status: + d = radar_obj.dRel + v = radar_obj.vLead + side_object_dist = d + v * 4.0 + else: + side_object_dist = 255.0 + + object_detected = side_object_dist < (v_ego * 3.0) + if object_detected: + self.object_detected_count = max(1, self.object_detected_count + 1) + else: + self.object_detected_count = min(-1, self.object_detected_count - 1) + + self.side_object_detected = self.object_detected_count > int(-0.3 / DT_MDL) + + # BSD hold (�䱸����: ���� �� 2�� ����) + self.bsd_detected_now = bool(blindspot) + if self.bsd_detected_now and not ignore_bsd: + self.bsd_hold_counter = int(bsd_hold_sec / DT_MDL) + else: + self.bsd_hold_counter = max(0, self.bsd_hold_counter - 1) + + def compute_lane_change_available(self, lane_line_info_lt_20: bool, ignore_bsd: bool): + # geometric availability + self.lane_change_available_geom = (self.lane_available or self.edge_available) and lane_line_info_lt_20 + + # include bsd/object into lane_change_available (�䱸����) + bsd_active = (self.bsd_hold_counter > 0) and (not ignore_bsd) + self.lane_change_available = self.lane_change_available_geom and (not self.side_object_detected) and (not bsd_active) + + def update_triggers(self): + # lane_available_trigger (���� ���� ����) + self.lane_available_trigger = False + if self.lane_width_diff > 0.8 and (self.lane_width < self.dist_to_edge): + self.lane_available_trigger = True + + # lane_appeared (bugfix: == ���� >=�� �ڿ�������) + # + edge�� �ʹ� �ָ�(������) lane_appeared�� �����ϰ� true�� ������ �ʰ� ���� + appeared_now = self.lane_exist_count.counter >= int(0.2 / DT_MDL) + self.lane_appeared = (self.lane_appeared or appeared_now) and (self.dist_to_edge < 4.0) + + def commit_last(self): + self.lane_available_last = self.lane_available + self.edge_available_last = self.edge_available diff --git a/selfdrive/modeld/modeld.py b/selfdrive/modeld/modeld.py index 5f88796eeb..ffa6ffda70 100755 --- a/selfdrive/modeld/modeld.py +++ b/selfdrive/modeld/modeld.py @@ -419,13 +419,15 @@ def main(demo=False): drivingdata_send.drivingModelData.meta.laneChangeState = DH.lane_change_state drivingdata_send.drivingModelData.meta.laneChangeDirection = DH.lane_change_direction - modelv2_send.modelV2.meta.laneWidthLeft = float(DH.lane_width_left) - modelv2_send.modelV2.meta.laneWidthRight = float(DH.lane_width_right) - modelv2_send.modelV2.meta.distanceToRoadEdgeLeft = float(DH.distance_to_road_edge_left) - modelv2_send.modelV2.meta.distanceToRoadEdgeRight = float(DH.distance_to_road_edge_right) + modelv2_send.modelV2.meta.laneWidthLeft = float(DH.left.lane_width) + modelv2_send.modelV2.meta.laneWidthRight = float(DH.right.lane_width) + modelv2_send.modelV2.meta.distanceToRoadEdgeLeft = float(DH.left.dist_to_edge) + modelv2_send.modelV2.meta.distanceToRoadEdgeRight = float(DH.right.dist_to_edge) modelv2_send.modelV2.meta.desire = DH.desire modelv2_send.modelV2.meta.laneChangeProb = DH.lane_change_ll_prob modelv2_send.modelV2.meta.modelTurnSpeed = float(DH.model_turn_speed) + modelv2_send.modelV2.meta.laneChangeAvailableLeft = DH.lane_change_available_left + modelv2_send.modelV2.meta.laneChangeAvailableRight = DH.lane_change_available_right fill_pose_msg(posenet_send, model_output, meta_main.frame_id, vipc_dropped_frames, meta_main.timestamp_eof, live_calib_seen) pm.send('modelV2', modelv2_send)