forked from SteelRidgeRobotics/2026Rebuilt
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrobot.py
More file actions
177 lines (146 loc) · 6.77 KB
/
robot.py
File metadata and controls
177 lines (146 loc) · 6.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import wpilib
from commands2 import CommandScheduler, Command
from ntcore import NetworkTableInstance
from phoenix6 import SignalLogger
from pykit.loggedrobot import LoggedRobot
from pykit.logger import Logger
from pykit.logreplaysource import LogReplaySource
from pykit.networktables.nt4Publisher import NT4Publisher
from pykit.wpilog.wpilogwriter import WPILOGWriter
from wpilib import DriverStation, Timer
from robot_config import currentRobot, has_subsystem
# Workaround for PyKit: ntcore IntegerPublisher and wpiutil DataLog expect int64,
# but LogTable.getTimestamp() can exceed int64 max. Patch at source to fix all
# receivers (NT4Publisher, WPILOGWriter, etc.).
_INT64_MAX = 2**63 - 1
from pykit.logtable import LogTable
_original_get_timestamp = LogTable.getTimestamp
def _patched_get_timestamp(self):
ts = _original_get_timestamp(self)
if ts > _INT64_MAX:
ts = ts // 1000 # Assume nanoseconds -> microseconds
if ts > _INT64_MAX:
ts = _INT64_MAX
return ts
LogTable.getTimestamp = _patched_get_timestamp
from constants import Constants
from lib import elasticlib
from lib.elasticlib import Notification, NotificationLevel
from robot_container import RobotContainer
import util
class Dwayne(LoggedRobot):
def __init__(self, period = 0.02) -> None:
super().__init__()
Logger.recordMetadata("Robot", currentRobot.name)
Logger.recordMetadata("Mode", Constants.currentMode.name)
# Record subsystem metadata
Logger.recordMetadata("ClimberPresent", str(has_subsystem("climber")))
Logger.recordMetadata("FeederPresent", str(has_subsystem("feeder")))
Logger.recordMetadata("HoodPresent", str(has_subsystem("hood")))
Logger.recordMetadata("IntakePresent", str(has_subsystem("intake")))
Logger.recordMetadata("LauncherPresent", str(has_subsystem("intake")))
Logger.recordMetadata("TurretPresent", str(has_subsystem("turret")))
Logger.recordMetadata("VisionPresent", str(has_subsystem("vision")))
match Constants.currentMode:
# Running on a real robot, log to a USB stick ("/U/logs")
case Constants.Mode.REAL:
deploy_config = wpilib.deployinfo.getDeployData()
if deploy_config is not None:
Logger.recordMetadata(
"Deploy Host", deploy_config.get("deploy-host", "")
)
Logger.recordMetadata(
"Deploy User", deploy_config.get("deploy-user", "")
)
Logger.recordMetadata(
"Deploy Date", deploy_config.get("deploy-date", "")
)
Logger.recordMetadata(
"Code Path", deploy_config.get("code-path", "")
)
Logger.recordMetadata("Git Hash", deploy_config.get("git-hash", ""))
Logger.recordMetadata(
"Git Branch", deploy_config.get("git-branch", "")
)
Logger.recordMetadata(
"Git Description", deploy_config.get("git-desc", "")
)
Logger.addDataReciever(WPILOGWriter())
Logger.addDataReciever(NT4Publisher(True))
# Running a physics simulator, log to NT
case Constants.Mode.SIM:
Logger.addDataReciever(NT4Publisher(True))
# Replaying a log, set up replay source
case Constants.Mode.REPLAY:
self.useTiming = False
Logger.setReplaySource(LogReplaySource())
Logger.addDataReciever(WPILOGWriter(None))
# Avoid CAN errors from pykit when no PDH/PDP is on the bus (or wrong module ID)
util._install_safe_power_distribution_logging()
# Start PyKit logger
Logger.start()
DriverStation.silenceJoystickConnectionWarning(not DriverStation.isFMSAttached())
self.container = RobotContainer()
SignalLogger.enable_auto_logging(False)
SignalLogger.stop()
wpilib.LiveWindow.disableAllTelemetry()
# We can check ourselves if we're overrunning
# And the warning it prints is laggy, so this makes it only tell us if
# We're REALLY lagging
CommandScheduler.getInstance().setPeriod(1.0)
dashboard_nt = NetworkTableInstance.getDefault().getTable("Elastic")
self._match_time_pub = dashboard_nt.getFloatTopic("Match Time").publish()
self._game_phase_name_pub = dashboard_nt.getStringTopic("Game Phase").publish()
self._game_phase_time_pub = dashboard_nt.getFloatTopic("Phase Time").publish()
def robotPeriodic(self) -> None:
CommandScheduler.getInstance().run()
self._match_time_pub.set(Timer.getMatchTime())
phase_name, phase_time = util.get_game_phase()
self._game_phase_name_pub.set(phase_name)
self._game_phase_time_pub.set(float(phase_time))
if self.container.drivetrain is not None:
self.container._field.setRobotPose(self.container.drivetrain.get_cached_state().pose)
Logger.recordOutput("Components", self.container.get_component_poses())
def _simulationPeriodic(self) -> None:
self.container.fuel_sim.update_sim()
def autonomousInit(self) -> None:
selected_auto = self.container.get_autonomous_command()
if selected_auto is not None:
print(f"Selected Auto: {selected_auto.getName()}")
selected_auto.schedule()
def autonomousPeriodic(self) -> None:
pass
def autonomousExit(self) -> None:
print("Autonomous period ended")
def teleopInit(self) -> None:
print("Teleoperated period started")
command = self.container.get_autonomous_command()
if isinstance(command, Command):
command.cancel()
def teleopExit(self) -> None:
print("Teleoperated period ended")
if DriverStation.isFMSAttached():
elasticlib.send_notification(
Notification(
level=NotificationLevel.INFO.value,
title="Good match!",
description="(again)" if DriverStation.getReplayNumber() > 1 else ""
)
)
def testInit(self):
print("Test period started")
CommandScheduler.getInstance().cancelAll()
def disabledInit(self):
if self.container.vision is not None:
self.container.vision.set_throttle(150)
def disabledExit(self):
if self.container.vision is not None:
self.container.vision.set_throttle(0)
def testExit(self):
print("Test period ended")
def disabledPeriodic(self) -> None:
pass
def teleopPeriodic(self) -> None:
pass
def testPeriodic(self) -> None:
pass