From cae1b5a89fe00577eed72e5d123c223545ba7082 Mon Sep 17 00:00:00 2001 From: johnnykoch02 Date: Sat, 4 Mar 2023 16:09:04 -0500 Subject: [PATCH 1/5] Adding Quick setup To Linux Machine, Installs some dependencies. --- README.md | 5 +++++ Setup/setup_device.py | 24 ++++++++++++++++++++++++ 2 files changed, 29 insertions(+) create mode 100644 Setup/setup_device.py diff --git a/README.md b/README.md index bb98356..3bce41e 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,7 @@ # Jetson Robotic Subsystem Optimized for Computer Vision on NVIDA's Jetson Nano, and developed to pair with the V5 Brain of the IEEE VEX Robotics Team at the University of South Florida + +# Setup +1. Download arm file from https://drive.google.com/drive/folders/1_ZBm6zKnXEisfg1AqtZdvGWZXl8Se4wn +2. Place file in Setup/ +3. Run Setup_Device.py \ No newline at end of file diff --git a/Setup/setup_device.py b/Setup/setup_device.py new file mode 100644 index 0000000..581fa66 --- /dev/null +++ b/Setup/setup_device.py @@ -0,0 +1,24 @@ +import subprocess + +try: +# Arm Installation Ref: https://askubuntu.com/questions/1243252/how-to-install-arm-none-eabi-gdb-on-ubuntu-20-04-lts-focal-fossa + subprocess.check_call(["sudo", "apt", "remove", "gcc-arm-none-eabi"]) + subprocess.check_call(["sudo", "tar", "xjf","gcc-arm-none-eabi-10.3-2021.10-x86_64-linux.tar.bz2", "-C", "/usr/share/"]) + + subprocess.check_call(["sudo", "ln", "-s", "/usr/share/gcc-arm-none-eabi-10.3-2021.10-x86_64-linux/bin/arm-none-eabi-gcc", "/usr/bin/arm-none-eabi-gcc"]) + subprocess.check_call(["sudo", "ln", "-s", "/usr/share/gcc-arm-none-eabi-10.3-2021.10-x86_64-linux/bin/arm-none-eabi-g++", "/usr/bin/arm-none-eabi-g++"]) + subprocess.check_call(["sudo", "ln", "-s", "/usr/share/gcc-arm-none-eabi-10.3-2021.10-x86_64-linux/bin/arm-none-eabi-size", "/usr/bin/arm-none-eabi-size"]) + subprocess.check_call(["sudo", "ln", "-s", "/usr/share/gcc-arm-none-eabi-10.3-2021.10-x86_64-linux/bin/arm-none-eabi-objcopy", "/usr/bin/arm-none-eabi-objcopy"]) + print("Successfuly installed Arm Compilation Tools...") +except subprocess.CalledProcessError as err: + print("Arm Installation may have Failed...") +#Install Jsoncpp +try: + # Compiler Packages + subprocess.check_call(["sudo", "apt", "install", "python3", "python-dev", "g++", "make"]) + # JsonCpp + subprocess.check_call(["sudo", "apt", "install", "libtool", "flex", "bison", "pkg-config", "libjsoncpp-dev"]) + + +except subprocess.CalledProcessError as err: + print("Program Tools/Libraries/Dependencies have failed to install...") \ No newline at end of file From f3db89b36710446ccf22d83a8e91d29c2934cb02 Mon Sep 17 00:00:00 2001 From: TheGuy920 <53882381+TheGuy920@users.noreply.github.com> Date: Thu, 16 Mar 2023 21:10:06 -0400 Subject: [PATCH 2/5] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 3bce41e..00cec1e 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # Jetson -Robotic Subsystem Optimized for Computer Vision on NVIDA's Jetson Nano, and developed to pair with the V5 Brain of the IEEE VEX Robotics Team at the University of South Florida +Robotic Subsystem Optimized for Computer Vision on NVIDA's Jetson Nano, and developed to pair with the V5 Brain of the IEEE TerriBull VEX Robotics Team # Setup 1. Download arm file from https://drive.google.com/drive/folders/1_ZBm6zKnXEisfg1AqtZdvGWZXl8Se4wn 2. Place file in Setup/ -3. Run Setup_Device.py \ No newline at end of file +3. Run Setup_Device.py From da8c9506b2c2fe02a87c446271dcfaa25e28b735 Mon Sep 17 00:00:00 2001 From: "DESKTOP-N9VDHAM\\John" Date: Wed, 26 Apr 2023 07:12:51 -0400 Subject: [PATCH 3/5] Cahnging Data for V5 and Vision --- Serial/VexCommunications.py | 31 +++++++++++--------- main.py | 57 +++++++++++++++++++++++-------------- 2 files changed, 52 insertions(+), 36 deletions(-) diff --git a/Serial/VexCommunications.py b/Serial/VexCommunications.py index 2e8b1d5..220ebd8 100644 --- a/Serial/VexCommunications.py +++ b/Serial/VexCommunications.py @@ -206,7 +206,11 @@ def SendRawBuffer( self, _bytes: str ) -> int: def WaitForTags(self): self.__recieved_tags = False + ctr = 0 while(self.__recieved_tags == False): + ctr+=1 + if ctr % 100 == 0: + print('Watining for tags...') time.sleep(0.1) def __SendTags( self ): @@ -228,11 +232,10 @@ def RecieveTagList( self, array: list ): tag_id = array[1] friendly_name = array[2] self.__extendedTags[tag_id] = CallbackItem(friendly_name, None) - - found = [k for k in self.__callbacks if k is not None and k.friendly_name == friendly_name] - if len(found) > 0: - self.__extendedTags[tag_id].callback = found[0].callback - print(found[0].callback) + found_callback = [i for i in self.__callbacks if i is not None and i.friendly_name == friendly_name] + if found_callback: + self.__extendedTags[tag_id].callback = found_callback[0].callback + print("Recieved: " + friendly_name) @@ -259,7 +262,7 @@ async def __ReadInput( self ): while not self.__stop_token: #self.serial.write(10) for _int in self.serial.read(): - #self.serial.write(10) + # print(_int) packet_length = len(self.__next_packet) #if packet_length - 1 < self.__next_packet[0]: if packet_length >= self.header_length: @@ -288,9 +291,9 @@ def __ParsePacket( self, packet ): print(function_id - self.__packetIndexOffset) print([i.friendly_name for i in self.__callbacks if i is not None]) - # print("================================") - # print("Function Called: \"{}\"".format(item.friendly_name)) - # print("Total Bytes: {}".format(len(packet))) + print("================================") + print("Function Called: \"{}\"".format(item.friendly_name)) + print("Total Bytes: {}".format(len(packet))) packet = packet[self.header_length:-self.footer_length] pindex = 1 @@ -302,12 +305,12 @@ def __ParsePacket( self, packet ): paramaters.append(data.value) pindex += data.next + 1 - #print("Parameters: {}".format(paramaters)) - #diff = (datetime.now() - self.last_date) - # self.last_date = datetime.now() + print("Parameters: {}".format(paramaters)) + diff = (datetime.now() - self.last_date) + self.last_date = datetime.now() - #print("Time Since Last Call: {}".format(diff.total_seconds())) - #print("Parse Time: {}".format(diff.total_seconds() * 1000000)) + print("Time Since Last Call: {}".format(diff.total_seconds())) + print("Parse Time: {}".format(diff.total_seconds() * 1000000)) item.callback(paramaters) def __run_coroutine(self): diff --git a/main.py b/main.py index 702275d..457df12 100644 --- a/main.py +++ b/main.py @@ -10,6 +10,7 @@ import pstats import sys import threading +import datetime DEBUG_VISUALIZE = True # from Testing.Visualize import * @@ -57,24 +58,36 @@ # # tasks.QueueTask(task.GetSteps()) # t = nt -# #pr.disable() -# #ps = pstats.Stats(pr).sort_stats('cumtime') -# #ps.print_stats() -# #break - + #pr.disable() + #ps = pstats.Stats(pr).sort_stats('cumtime') + #ps.print_stats() + #break +''' + Callbacks +''' fcounter = 1 -def test_function( params ): +def SerialTestJetsonToV5( params ): global fcounter print(f"Recieved: {fcounter}") for param in range(len(params)): print(params[param]) fcounter += 1 - + +def SerialTestV5ToJetson( params ): + global fcounter + print(f"Recieved: {fcounter}") + for param in range(len(params)): + print(params[param]) + fcounter += 1 +''' +End Callbacks +''' def Main() -> 0: # global objects # Load serial Communications # comms = Communications() - # comms.RegisterCallback("test_function", test_function) + # comms.RegisterCallback("serial_test_v5_to_jetson", SerialTestV5ToJetson) + comms.RegisterCallback("serial_test_jetson_to_v5", SerialTestJetsonToV5) # comms.Start() # comms.WaitForTags() camera = Vision(False) @@ -82,20 +95,20 @@ def Main() -> 0: while True: camera.CollectObjects() - # sent = 1 - # ctime = time.time() - # deltatime = 0 - # ltime = ctime - # while(True): - # deltatime += (time.time() - ltime) * 1000 - # ltime = time.time() - # #print(deltatime) - # if deltatime > 10: - # deltatime = 0 - # b = comms.SendPacket("TestFunction", 145.48, 541.785, "this is a long test string for testing sending string from the jetson to the v5 brain") - # print(f"Sent: {sent}")#, int((ltime - ctime) * 10) / 10, b) - # sent+=1 - # time.sleep(0.005) + sent = 1 + ctime = time.time() + deltatime = 0 + ltime = ctime + while(True): + deltatime += (time.time() - ltime) * 1000 + ltime = time.time() + #print(deltatime) + if deltatime > 10: + deltatime = 0 + b = comms.SendPacket("TestFunction", 145.48, 541.785, "this is a long test string for testing sending string from the jetson to the v5 brain") + print(f"Sent: {sent}")#, int((ltime - ctime) * 10) / 10, b) + sent+=1 + time.sleep(0.005) # Init Game Object manager # objects = ObjectManager() From 12eefde2db50befde3e8c9e317e6d27d4ac70f3d Mon Sep 17 00:00:00 2001 From: "DESKTOP-N9VDHAM\\John" Date: Wed, 26 Apr 2023 07:13:22 -0400 Subject: [PATCH 4/5] Changes to support Data formatting btw. V5, Vision --- Classes/GameDecision.py | 9 +- Classes/GameObject.py | 43 ++++-- Classes/GameTask.py | 5 - Classes/StaticFieldConfig.py | 47 ++++++ Classes/Vector.py | 53 ++++++- Managers/GameAnalysis.py | 55 ++++++- Managers/GameHasher.py | 7 +- Managers/GameObjects.py | 112 ++++++++++++++- Managers/GameTasks.py | 221 +++++++++++++++++++++++++++-- Managers/UpdateManager.py | 38 +++++ Serial/VexCommunications.py | 14 +- globals.py | 6 + main.py | 268 ++++++++++++++++++++--------------- yolov5 | 1 + 14 files changed, 723 insertions(+), 156 deletions(-) delete mode 100644 Classes/GameTask.py create mode 100644 Classes/StaticFieldConfig.py create mode 100644 Managers/UpdateManager.py create mode 100644 globals.py create mode 160000 yolov5 diff --git a/Classes/GameDecision.py b/Classes/GameDecision.py index 779e6e1..0e8cb94 100644 --- a/Classes/GameDecision.py +++ b/Classes/GameDecision.py @@ -1,7 +1,10 @@ from Classes.GameObject import * class Decision: - def __init__( self ) -> None: - pass - def GetTarget( self ) -> GameObject: + + def __init__( self, target: GameObject, target_type, **kwargs) -> None: + self.target = target + self.target_type = target_type + self.kwargs = kwargs + def Getdecision( self ) -> GameObject: return self.target \ No newline at end of file diff --git a/Classes/GameObject.py b/Classes/GameObject.py index f967d2d..1f377cc 100644 --- a/Classes/GameObject.py +++ b/Classes/GameObject.py @@ -1,20 +1,43 @@ -from enum import Enum +from enum import IntEnum from Classes.Vector import * -class ObjectType(Enum): - FRISBEE = 1 - ROLLER = 2 - HIGH_GOAL = 3 +class ObjectType(IntEnum): + FRISBEE = 0 + ROLLER = 1 + RED_HIGH_GOAL = 2 # BRAIN thinks this is just a high goal + BLUE_HIGH_GOAL = 3 LOW_GOAL = 4 ENEMY_ROBOT = 5 FRIENDLY_ROBOT = 6 - +class RollerColor(IntEnum): + RED = 0 + BLUE = 1 + NEUTRAL = 2 +class GoalColor(IntEnum): + RED = 0 + BLUE = 1 class GameObject: - def __init__(self) -> None: + def __init__(self, pos:Vec, lifespan= 3, type=ObjectType.FRISBEE, **kwargs) -> None: self.id = 0 self.score = 0 - self.type = ObjectType.FRISBEE - self.position = Vec(0,0,0) + self.type = type + self.position = pos self.velocity = Vec(0,0,0) - \ No newline at end of file + self.radial_distance = 0 + self.lifespan = lifespan + self.updated = False + self.ovo = 0 + self.vars = {} + if self.type == ObjectType.FRISBEE: + pass + elif self.type == ObjectType.ROLLER: + self.vars['color'] = kwargs['color'] if 'color' in kwargs else RollerColor.NEUTRAL + self.vars['is_in_contact'] = False + elif self.type == ObjectType.HIGH_GOAL: + self.vars['valid'] = False + self.vars['color'] = kwargs['color'] if 'color' in kwargs else GoalColor.RED + def __getitem__(self, item): + return self.vars[item] if item in self.vars.keys() else None + def getVector2(self) : + return Vector2(r=self.distance, theta=self.ovo) \ No newline at end of file diff --git a/Classes/GameTask.py b/Classes/GameTask.py deleted file mode 100644 index 01f3a59..0000000 --- a/Classes/GameTask.py +++ /dev/null @@ -1,5 +0,0 @@ - -class Task: - def __init__(self) -> None: - pass - \ No newline at end of file diff --git a/Classes/StaticFieldConfig.py b/Classes/StaticFieldConfig.py new file mode 100644 index 0000000..2362cfa --- /dev/null +++ b/Classes/StaticFieldConfig.py @@ -0,0 +1,47 @@ +from Classes.GameObject import * +from Classes.Vector import * + +game_objects_config = [ + { + "pos": Vec(120, 120, 0), + "type": ObjectType.HIGH_GOAL, + "kwargs": { + 'color': GoalColor.RED + } + }, + { + "pos": Vec(24, 24, 24), + "type": ObjectType.HIGH_GOAL, + "kwargs": { + 'color': GoalColor.BLUE + } + }, + { + "pos": Vec(112, 0, 18), + "type": ObjectType.ROLLER, + "kwargs": { + 'color': RollerColor.NEUTRAL + } + }, + { + "pos": Vec(144, 30, 18), + "type": ObjectType.ROLLER, + "kwargs": { + 'color': RollerColor.NEUTRAL + } + }, + { + "pos": Vec(0, 112, 18), + "type": ObjectType.ROLLER, + "kwargs": { + 'color': RollerColor.NEUTRAL + } + }, + { + "pos": Vec(30, 144, 18), + "type": ObjectType.ROLLER, + "kwargs": { + 'color': RollerColor.NEUTRAL + } + }, +] \ No newline at end of file diff --git a/Classes/Vector.py b/Classes/Vector.py index 50c3256..ce3a85a 100644 --- a/Classes/Vector.py +++ b/Classes/Vector.py @@ -1,4 +1,10 @@ import math +import numpy as np + +cos, sin, tan, radians, degrees, arccos, arcsin, arctan, arctan2 = np.cos, np.sin, np.tan, np.radians, np.degrees, np.arccos, np.arcsin, np.arctan2 +PI = 3.14159 +METERS_TO_INCHES = 39.3700787 +EPSILON = 1e-7 class Vec(object): def __init__(self, x, y, z): @@ -64,4 +70,49 @@ def __div__(self, v): return Vec(self.x / v, self.y / v, self.z / v) def __str__(self): - return '[ %.4f, %.4f, %.4f ]' % (self.x, self.y, self.z) \ No newline at end of file + return '[ %.4f, %.4f, %.4f ]' % (self.x, self.y, self.z) + + def toVector2( self ): + return Vector2(x=self.x, y=self.y) + +class Vector2: + '''x and y components OR the length and angle from X-Axis Counter Clockwise in Degrees''' + def __init__(self,x=0, y=0, r=0, theta=0): + if x!=0 or y!=0: + self.x = x + self.y = y + self.r = ((self.x**2 + self.y**2)**0.5) + self.theta = degrees(arctan2(self.y,self.x)) + else: + self.r = r + self.theta = theta + self.x = self.r * cos(radians(theta)) + self.y = self.r * sin(radians(theta)) + + def dot(self, b): + return (self.x*b.x) + (self.y*b.y) + def unit(self) -> 'Vector2': + return Vector2(x=self.x/self.r, y=self.y/self.r) + def scale(self, scalar: float) -> 'Vector2': + return Vector2(x=self.x*scalar, y=self.y*scalar) + def angle_from_dot(a, b): + return degrees(arccos((a.dot(b)) / (a.r * b.r) )) + def __str__(self): + return "i:{}, j:{}, r:{}, theta:{}".format(self.x, self.y, self.r, self.theta) + def __repr__(self): + return "i:{}, j:{}, r:{}, theta:{}".format(self.x, self.y, self.r, self.theta) + def __getitem__(self, idx): + if idx == 0: + return self.x + if idx == 1: + return self.y + else: + return None + def __add__(self, other) -> 'Vector2': + return Vector2(self.x + other.x, self.y + other.y) + def __sub__(self, other) -> 'Vector2': + return Vector2(self.x - other.x, self.y - other.y) + def __mul__(self, scalar: float) -> 'Vector2': + return Vector2(self.x * scalar, self.y * scalar) + def __truediv__(self, scalar: float) -> 'Vector2': + return Vector2(self.x / scalar, self.y / scalar) \ No newline at end of file diff --git a/Managers/GameAnalysis.py b/Managers/GameAnalysis.py index 70481d3..76fd622 100644 --- a/Managers/GameAnalysis.py +++ b/Managers/GameAnalysis.py @@ -1,23 +1,33 @@ from Classes.GameDecision import * +from globals import * from Classes.GameCorrection import * from Managers.GameHasher import * from Managers.GameObjects import * +from Managers.GameTasks import * from concurrent.futures import ThreadPoolExecutor import time - +from typing import List, Dict, Tuple, Union class Analyzer: def __init__(self) -> None: self.__gameStartTime = time.time() * 1000 - self.__hasher = Hasher() + self.__hasher = None pass # Finds the best decision # Add roller blocking - def FindBest( self, game_data: ObjectManager ) -> Decision: + + def FindBest( self, game_data: ObjectManager, task_manager: TaskManager ) -> Decision: hash_list = [] + if self.getMagEstimate(game_data, task_manager) >= 3: # We decide to shoot the disks + return Decision(game_data.GetGoals()[GoalColor(CURRENT_TEAM_COLOR)], Actions.Type.SHOOT_DISK) - bots = game_data.GetBots() frisbees = game_data.GetFrisbees() + closest_frisbee = min(frisbees, key= lambda obj: obj.distance) + if closest_frisbee.distance < 2.5: # Pick up the closest frisbee + return Decision(closest_frisbee, Actions.Type.LOAD_DISK) + + self.__hasher = Hasher(game_data) + bots = game_data.GetBots() rollers = game_data.GetRollers() extra_data = {"bots": bots, "frisbees": frisbees, "rollers": rollers} @@ -29,8 +39,43 @@ def FindBest( self, game_data: ObjectManager ) -> Decision: hash_list.extend(hashedFrisbees) hash_list.extend(hashedRollers) + # actions = [ + # Actions.Type.CAPTURE_ROLLER, + # Actions.Type.DRIVE_TO_OBJECT, + # Actions.Type.LOAD_DISK, + # Actions.Type.SHOOT_DISK, + # Actions.Type.TURN_TO_ANGLE, + # Actions.Type.DRIVE_TO_OBJECT + # ] - return hash_list + # def score_actions(frisbees: List[GameObject], rollers:List[GameObject], object_manager:ObjectManager, task_manager: TaskManager) -> List[Dict[str, Union[int, float]]]: + # dt = self.__GetDeltaTime() + # def CaptureRollerScore(frisbees: List[GameObject], rollers:List[GameObject], object_manager:ObjectManager, task_manager: TaskManager) -> List[Dict[str, Union[int, float]]]: + # kTime = 0.2 + # kDistance = 0.3 + # kOrientation = 0.3 + # cost = lambda dt, distance, orientation, roller: (((55 - (dt/1000) / 55) + kTime) * ((distance*kDistance + orientation*kOrientation) / 12) + int(math.inf*int(roller['color']) == CURRENT_TEAM_COLOR)) # Feet + + # def DriveToObjectScore(frisbees: List[GameObject], object_manager:ObjectManager, task_manager: TaskManager) -> List[Dict[str, Union[int, float]]]: + # kTime = 0.2 + # kDistance = 0.3 + # kOrientation = 0.3 + # kNum0bjs = 0.68 + # cost = lambda distance, orientation, dt, cnt_objs, : int((dt/1000)<45) * ((distance*kDistance + orientation*kOrientation) / 12) * kNum0bjs((3-cnt_objs)%4) + # for frisbee in frisbees: + # total_cost = cost() + # scored_items = for item.id: { + # "score": score, + # "object": item + # } + + + hash_list = sorted(hash_list, key=lambda x: x[next(iter(x.keys()))]['score'], reverse=True) + target_object = hash_list[0]['object'] + if target_object.type == ObjectType.FRISBEE: + return Decision(target_object, Actions.Type.DRIVE_TO_OBJECT) + elif target_object.type == ObjectType.ROLLER: + return Decision (target_object, Actions.Type.CAPTURE_ROLLER, kwargs= {'power': 0.35}) def __ThreadedHashing( self, objects: list, extra: dict) -> list: diff --git a/Managers/GameHasher.py b/Managers/GameHasher.py index 4cd8cbe..396caae 100644 --- a/Managers/GameHasher.py +++ b/Managers/GameHasher.py @@ -1,3 +1,4 @@ +from Managers.GameObjects import * from Classes.GameObject import * ROBOT_SPEED = 100.0 @@ -38,9 +39,9 @@ } class Hasher: - def __init__(self) -> None: - self.robot_velocity = Vec(1,0,0) - pass + def __init__(self, object_manager) -> None: + self.object_manager = object_manager + self.robot_velocity = object_manager.my_robot['velocity'] def GetGenericHash( self, weight_type: ObjectType, weight, delta ) -> float: weights = hash_weights[weight_type] diff --git a/Managers/GameObjects.py b/Managers/GameObjects.py index 78d8f18..ba25673 100644 --- a/Managers/GameObjects.py +++ b/Managers/GameObjects.py @@ -1,7 +1,19 @@ +import time from Classes.GameObject import * +from Serial.VexCommunications import Communications +from Classes.Vector import * +from typing import Dict, List +import math + +def create_game_objects(): + from Classes.StaticFieldConfig import game_objects_config + game_objects = [] + for obj_data in game_objects_config: + game_objects.append(GameObject(obj_data['pos'], obj_data['type'], kwargs=obj_data.get('kwargs', {}), lifespan=math.inf)) + return game_objects class ObjectManager: - def __init__(self) -> None: + def __init__(self, starting_pos, starting_angle, comms: Communications) -> None: # int, GameObject self.frisbees = dict() self.enemies = dict() @@ -9,7 +21,20 @@ def __init__(self) -> None: self.goals = dict() self.rollers = dict() self.target = None - + self.comms = comms + self.__mag_cnt = 0 + self.__my_robot__ = { + 'pos': starting_pos, + 'previous_pos': starting_pos, + 'velocity': Vec(0, 0, 0), + 'theta': starting_angle, + 'previous_theta': starting_angle, + 'angular_velocity': 0, + } + self.ctime = time.time() + self.ltime = self.ctime + self.object_memory = dict() + def __Trace( self, table: dict, obj: GameObject): if obj.id in table.keys(): table.update({obj.id: obj}) @@ -26,10 +51,27 @@ def TraceObject( self, object: GameObject ) -> None: self.__Trace(self.friendly, object) if object.type == ObjectType.ROLLER: self.__Trace(self.rollers, object) - + + def UpdateRobot(self, pos:Vec, theta:float) -> None: + self.ctime = time.time() + previous_pos = self.my_robot['pos'] + new_velocity = (pos - previous_pos) / (self.ctime - self.ltime) + previous_theta = self.my_robot['theta'] + angular_velocity = (theta - previous_theta) / (self.ctime - self.ltime) + self.my_robot = { + 'pos': pos, + 'previous_pos': previous_pos, + 'velocity': new_velocity, + 'theta': theta, + 'previous_theta': previous_theta, + 'angular_velocity': angular_velocity, + } + def GetRobot( self ) -> Dict[str, any]: + return self.__my_robot__ def SetCurrentTarget( self, object: GameObject ) -> None: self.target = object + self.CommsSetTarget() def GetCurrentTarget( self ) -> GameObject: return self.target def SerializeForComs( self ) -> bytearray: @@ -41,10 +83,72 @@ def GetFriendly( self ) -> list: def GetFrisbees( self ) -> list: return self.frisbees.values() def GetGoals( self ) -> dict: - pass + return self.goals def GetBots( self ) -> list: return (self.enemies | self.friendly).values() + def SetMagCnt( self, value: int ) -> None: + self.__mag_cnt = value + def GetMagCnt( self ) -> int: + return self.__mag_cnt def GetRollers( self ) -> list: return self.rollers.values() def GetAll( self ) -> list: return (self.enemies | self.friendly | self.frisbees | self.rollers).values() + def CommsSetTarget( self ) -> None: + returnCode = self.comms.SendPacket("set_target_obj", max(int(self.target.type), 2)) + def reset_updated(self): + for obj in self.GetAll(): + obj.updated = False + def CommsUpdateTargetDrDth( self ) -> None: + if self.target != None: + if self.target.type == ObjectType.FRISBEE: + self.comms.SendPacket("update_target_obj_dr_dtheta", self.target.r, self.target.ovo) + if self.target.type == ObjectType.RED_HIGH_GOAL or self.target.type == ObjectType.BLUE_HIGH_GOAL: + self.comms.SendPacket("update_target_obj_dr_dtheta", self.target.r, self.target.ovo, 0.0) + if self.target.type == ObjectType.ROLLER: #TODO: DO SOME FANCY SHMANCY AUGMENTATION MATH HERE FOR OVO AND R + self.comms.SendPacket("update_target_obj_dr_dtheta", self.target.r, self.target.ovo, self.target['color'], self.target['is_in_contact']) + def PruneObjects( self ) -> None: + for id, obj in self.frisbees.items(): + if not obj.updated: + obj['lifespan'] -= 1 + if obj['lifespan'] <= 0: + del self.frisbees[id] # What if is our target object? + + def ProcessImageData( self, image_data: List[List[float, float, ObjectType]]) -> None: + ''' + DATA FORMAT: [d_r, -d_theta, object_type], + ''' + self.reset_updated() + distance_threshold = 3.0 # Inches + for (d_r, d_theta, object_type) in image_data: + d_theta= -d_theta # <3 + if object_type == ObjectType.FRISBEE: + d_Vector = Vector2(r=d_r, theta=d_theta) + interp_pos = self.__my_robot__['pos'].toVector2() + d_Vector + min_euclidian_distance, id = math.inf, -1 + for i, (id, obj) in enumerate(self.frisbees.items()): + obj_pos = obj.position.toVector2() + euclidean_distance = (interp_pos.x - obj_pos.x) ** 2 + (interp_pos.y - obj_pos.y) ** 2 + if euclidean_distance < min_euclidian_distance: + id, min_euclidian_distance = i, euclidean_distance + if min_euclidian_distance < distance_threshold: + self.frisbees[id].updatePos(Vec(interp_pos.x, interp_pos.y, 0)) + self.frisbees[id].lifespan = 3 + self.frisbees[id].updated = True + else: + self.frisbees[len(self.GetFrisbees())] = GameObject(Vec(interp_pos.x, interp_pos.y, 0), ObjectType.FRISBEE) + if object_type == ObjectType.ROLLER: + d_Vector = Vector2(r=d_r, theta=d_theta) + interp_pos = self.__my_robot__['pos'].toVector2() + d_Vector + min_euclidian_distance, id = math.inf, -1 + for i, (id, obj) in enumerate(self.rollers.items()): + obj_pos = obj.position.toVector2() + euclidean_distance = (interp_pos.x - obj_pos.x) ** 2 + (interp_pos.y - obj_pos.y) ** 2 + print('Euc. Delta on Roller:', euclidean_distance) + if euclidean_distance < min_euclidian_distance: + id, min_euclidian_distance = i, euclidean_distance + if id != -1: + self.rollers[id] + + + self.PruneObjects() \ No newline at end of file diff --git a/Managers/GameTasks.py b/Managers/GameTasks.py index d7de2ec..a0870b9 100644 --- a/Managers/GameTasks.py +++ b/Managers/GameTasks.py @@ -1,13 +1,216 @@ -from Classes.GameTask import * +from typing import Literal, Union, TypeVar, Callable +from globals import * +from Classes.GameObject import * +from Classes.Vector import * +from collections import deque +from enum import IntEnum +from Serial.VexCommunications import Communications -class TaskManager: - def __init__(self) -> None: +class APICaller(object): + class APICalls(IntEnum): + GO_TO_POSITION = 0 #(DR DTHETA) + TURN_TO_ANGLE = 1 #(DTHETA) + SPIN_ROLLER = 2 + TURNOFF_ROLLER = 3 + SHOOT_DISK = 4 + LOAD_SHOOTER = 5 # ( ENSURE TARGET OBJECT SET ) + GO_TO_OBJECT = 6 + SET_TARGET_OBJECT = 7 # (object type) + TAG_MAP = dict({ + APICalls.GO_TO_POSITION: "go_to_pos_dr_dtheta", + APICalls.TURN_TO_ANGLE: "turn_to_angle", + APICalls.SPIN_ROLLER: "spin_roller", + APICalls.TURNOFF_ROLLER: "turnoff_roller", + APICalls.SHOOT_DISK: "shoot_disk", + APICalls.LOAD_SHOOTER: "load_shooter", + APICalls.GO_TO_OBJECT: "go_to_obj", + }) + + @staticmethod + def GetAPITag(tag) -> Union(str, None): + return APICaller.TAG_MAP.get(tag, None) + + def __init__(self, comms:Communications,): + self.__comms__ = comms + def MakeAPICall(self, method, *args) -> int: + return self.__comms__.SendPacket(APICaller.GetAPITag(method), *args) + def SendClearTasks( self ) -> int: + self.__comms__.SendPacket("clear_tasks", 0) + + +class Actions: + class Type(IntEnum): + CAPTURE_ROLLER = 0 # [Drive in Forward Direction torwards Roller, Turn 180 Deg., Turn On Roller, Finish Driving until In Contact, Wait For Color Target, Send Stop Signal.] + DRIVE_TO_TARGET = 1 # [Send over DrDtheta, wait for Task to Complete] + LOAD_DISK = 2 # [Set Target Object, Go To Object, Wait For Task to Complete] + SHOOT_DISK = 3 # [Set Target Object to Goal, Funnel Update Info (automatic), Wait For Task to Complete] + TURN_TO_ANGLE = 4 # [Set Dtheta, Wait For Task to Complete] + DRIVE_TO_OBJECT = 5 # Set Target Object, Go To Object, Wait For Task to Complete + def __init__(self, target_object:GameObject, action_type:Type, **kwargs): + self.vars = {} + self.type = action_type + self.isFinished = None # DEFAULT is None such that if a Task has a isFinished field, it will be used instead of the TaskManager. + self.seq_indexes_finished = [] + self.seq_indexes_send_kills = [] + self.target_object = target_object + if self.type == Actions.Type.CAPTURE_ROLLER: + self.vars['color'] = RollerColor(CURRENT_TEAM_COLOR) # Capture Color + self.vars['power'] = kwargs['power'] if 'power' in kwargs else 0.35 # as a Percentage of Max [-1: +1] + self.vars['r'] = kwargs['r'] if 'r' in kwargs else 2.5 + self.isFinished = lambda x: self.vars['color'] == x.target_object['color'] or x.target_object['is_in_contact'] + self.seq_indexes_finished.extend([4, 5]) + elif self.type == Actions.Type.DRIVE_TO_TARGET: + self.vars['r'] = kwargs['r'] + self.vars['theta'] = kwargs['theta'] + self.vars['reversed'] = kwargs['reversed'] if 'reversed' in kwargs else False + # self.isFinished = lambda x: x + elif self.type == Actions.Type.LOAD_DISK: + pass # Needs Update Target Object + elif self.type == Actions.Type.SHOOT_DISK: + self.vars['color'] = GoalColor(CURRENT_TEAM_COLOR) + # self.isFinished = lambda x: x + elif self.type == Actions.Type.TURN_TO_ANGLE: + self.vars['theta'] = kwargs['theta'] + self.vars['reversed'] = kwargs['reversed'] if 'reversed' in kwargs else False + # self.isFinished = lambda x: x + elif self.type == Actions.Type.DRIVE_TO_OBJECT: + self.vars['reversed'] = kwargs['reversed'] if 'reversed' in kwargs else False + def __getitem__(self, key): + return self.vars[key] if key in self.vars else None + +''' + Takes in an Action and outputs a List of API Calls associated with that Action. +''' +class LogicBuilder(object): + @staticmethod + def GetAPICalls(action:Actions) -> tuple[list[APICaller.APICalls], list[object]]: + ''' + \ The Get API Calls Function returns a list of API Calls associated with an Action, with associated action arguments to pass to the API Caller. + This Function Relies: + - Action is initialized properly. + - The Target Object is initialized properly. + ''' + if action.type == Actions.Type.CAPTURE_ROLLER: + API_Calls = [ + APICaller.APICalls.SET_TARGET_OBJECT, APICaller.APICalls.GO_TO_OBJECT, APICaller.APICalls.TURN_TO_ANGLE, APICaller.APICalls.SPIN_ROLLER, APICaller.APICalls.GO_TO_POSITION, APICaller.APICalls.TURNOFF_ROLLER # Here We Send Kill of the Goto once in contact. + ] + API_Args = [ + (int(action.target_object.type)), (int(False)), (float(180), float(False)), (float(action['power'])), (int(action['r']), 0, int(True)), (0) + ] + return API_Calls, API_Args + elif action.type == Actions.Type.DRIVE_TO_TARGET: + API_Calls = [ + APICaller.APICalls.GO_TO_POSITION, + ] + API_Args = [ + (float(action['r']), float(action['theta']), int(action['reversed'])) + ] + elif action.type == Actions.Type.LOAD_DISK: + API_Calls = [ + APICaller.APICalls.SET_TARGET_OBJECT, APICaller.APICalls.LOAD_SHOOTER, + ] + API_Args = [ + (int(action.target_object.type)), (0) + ] + elif action.type == Actions.Type.SHOOT_DISK: + API_Calls = [ + APICaller.APICalls.SET_TARGET_OBJECT, APICaller.APICalls.SHOOT_DISK, + ] + API_Args = [ + (int(action.target_object.type)), (0) + ] + elif action.type == Actions.Type.TURN_TO_ANGLE: + API_Calls = [ + APICaller.APICalls.TURN_TO_ANGLE, + ] + API_Args = [ + (float(action['theta']), float(action['reversed'])) + ] + elif action.type == Actions.Type.DRIVE_TO_OBJECT: + API_Calls = [ + APICaller.APICalls.SET_TARGET_OBJECT, APICaller.APICalls.GO_TO_OBJECT, + ] + API_Args = [ + (int(action.target_object.type)), (int(action['reversed'])), + ] + + +class TaskManager(object): + class Task(object): + def __init__(self, api_caller:APICaller, action: Actions): + self.action = action + self.__API_Calls, self.__API_Args = LogicBuilder.GetAPICalls(self.action) + self.sequence_position = 0 + self.sequence_length = len(self.__API_Calls) + self.__API__Caller = api_caller + self.API_call_Sent = False + self.target_object = action.target_object + + def update( self, task_finished ): + if task_finished: + self.sequence_position+=1 + self.API_call_Sent = False + elif self.action.isFinished is not None and self.sequence_position in self.action.seq_indexes_finished and self.action.isFinished(self.action): + self.__API__Caller.SendClearTasks() + + else: + if not self.API_call_Sent: + self.__API__Caller.MakeAPICall(self.__API_Calls[self.sequence_position], *self.__API_Args[self.sequence_position]) + + return True if self.sequence_position >= self.sequence_length else False + + def UpdateTargetObject( self, target_object:GameObject ): + self.target_object = target_object + self.action.target_object = target_object + + def __init__(self, object_manager, comms:Communications,) -> None: + self.object_manager = object_manager + self.current_task_number = 0 + self.current_action = None + self.current_task = None + self.queued_actions = deque() + self.__APICaller__ = APICaller(comms) + self.__total_robot_tasks_completed = 0 + self.__update_task__ = False + + def Initialize( self ) -> None: pass + + def Update( self ) -> None: + ''' + Some Actions may need an update if the majority of the Logic Takes place on the Jetson. + ''' + if self.current_task.target_object != self.object_manager.GetCurrentTarget(): + self.current_task.UpdateTargetObject(self.object_manager.GetCurrentTarget()) + if self.current_task is not None: + if self.current_task.update(self, self.__update_task__): + self.__OnSequenceFinished() + elif self.__update_task__: + self.__update_task__ = False + else: + self.StartNextTask() + def AlmostFinished( self ) -> bool: - return True - def QueueTask( self, step_list ): - pass - def GetCurrentTask( self ) -> Task: - pass + return self.current_task.sequence_position >= (self.current_task.sequence_length - 1) + def QueueAction( self, action:Actions): + self.queued_actions.append(action) + def GetCurrentAction( self ) -> Actions: + return self.current_action def Interupt( self ): - pass \ No newline at end of file + pass + def __OnSequenceFinished( self ): + self.StartNextTask() + + def StartNextTask( self ): + if len(self.queued_actions) > 0: + self.current_action = self.queued_actions.popleft() + self.current_task = TaskManager.Task(self.__APICaller__, self.current_action) + self.current_task_number = 0 + self.__update_task__ = False + + def OnTaskFinished( self, task_number ): + self.__update_task__ = True + self.current_task_number+=1 + self.__total_robot_tasks_completed = task_number + + diff --git a/Managers/UpdateManager.py b/Managers/UpdateManager.py new file mode 100644 index 0000000..e3c5885 --- /dev/null +++ b/Managers/UpdateManager.py @@ -0,0 +1,38 @@ +import time +import json +import asyncio +import threading +from Serial.VexCommunications import Communications + +class UpdateManager(object): + class ScheduledUpdate(object): + def __init__(self, call_func, frequency= 0.5): + self._callfunc = call_func + self.sum_time = 0 + frequency = frequency + def __init__(self): + self.ScheduledUpdates = [] + self.__update_th = threading.Thread(target=self.__update) + + def ScheduleUpdate( self, call_func, frequency= 0.5): + self.ScheduledUpdates.append( UpdateManager.ScheduledUpdate(call_func, frequency)) + def StartUpdates( self ): + self.__update_th.start() + + def __update(self): + # do something init wise + ct = time.time() + dt = 0 + lt = ct + while True: + dt = (time.time() - lt) + lt = time.time() + for update in self.ScheduledUpdates: + update.sum_time += dt + if update.sum_time >= update.frequency: + update._callfunc() + update.sum_time = 0 + time.sleep(0.1) + + + diff --git a/Serial/VexCommunications.py b/Serial/VexCommunications.py index 220ebd8..65d5074 100644 --- a/Serial/VexCommunications.py +++ b/Serial/VexCommunications.py @@ -80,9 +80,7 @@ def FindDeserializer( byte: int ): return StringDeserializer class CallbackItem: - friendly_name = "" - callback = None - def __init__( self, name, callback ): + def __init__( self, name="", callback=None ): self.friendly_name = name self.callback = callback @@ -97,7 +95,7 @@ class Communications: __recieved_tags = False header_length = len(__packet_header) + 1 footer_length = len(__end_of_transmission) - + def __init__( self ): self.__recieved_tags = True self.__callbacks = [None] * 256 @@ -254,7 +252,13 @@ def RegisterCallback( self, name: str, method ): index = self.__FindCallbackItem(name) print("Registered:", index, name) self.__callbacks[index] = CallbackItem(name, method) - + + def HasCallback( self, name: str ) -> bool: + for idx, _callback in self.__callbacks.items(): + if _callback.friendly_name == name: + return True + return False + async def __ReadInput( self ): self.__FindPort() self.last_date = datetime.now() diff --git a/globals.py b/globals.py new file mode 100644 index 0000000..f830646 --- /dev/null +++ b/globals.py @@ -0,0 +1,6 @@ +DEBUG_MODE = False +DEBUG_VISUALIZE = False +CURRENT_TEAM_COLOR= 0 # [0: RED, 1: BLUE] +STARTING_POS = (0, 0) +STARTING_ANGLE = 0.0 +HORIZONTAL_LIMIT = False \ No newline at end of file diff --git a/main.py b/main.py index 457df12..8924659 100644 --- a/main.py +++ b/main.py @@ -1,9 +1,12 @@ +from globals import * from Serial.VexCommunications import * from Managers.GameObjects import * from Managers.GameAnalysis import * from Managers.VisionManager import * from Managers.GameSteps import * from Managers.GameTasks import * +from Managers.UpdateManager import * +from Classes.GameDecision import * # import pyqtgraph as pg # from PyQt5 import QtWidgets, QtCore import cProfile @@ -12,134 +15,177 @@ import threading import datetime -DEBUG_VISUALIZE = True # from Testing.Visualize import * # def visualization(): # global objects # visualize_game_objects(objects.GetAll()) -# def GameLogic(camera: Vision, tasks: TaskManager, analyzer: Analyzer): -# t = time.time() -# # Beat the game -# while True: - -# for object in camera.CollectObjects(): -# objects.TraceObject(object) - -# nt = time.time() -# dt = nt - t - -# if DEBUG_VISUALIZE: - -# for bot in objects.GetBots(): -# update_robot_position_and_velocity(bot, dt) - -# # buf = objects.SerializeForComs() -# # comms.SendRawBuffer(buf) - -# # analyzer.WaitForCompletion() - -# # Task -> Actions -> Steps -> Directions -# # Reactive Program -> Reacts and responds its changing environment - -# if tasks.AlmostFinished(): -# decision = analyzer.FindBest(objects) - -# # if objects.GetCurrentTarget() != decision.GetTarget(): -# # objects.SetCurrentTarget(decision.GetTarget()) -# # steps = builder.BuildSteps(decision) -# # tasks.QueueTask(steps) -# else: -# pass -# # task = analyzer.TrackCompletion(objects, tasks.GetCurrentTask()) -# # if task.NeedsCorrection(): -# # tasks.Interupt() -# # tasks.QueueTask(task.GetSteps()) -# t = nt - - #pr.disable() - #ps = pstats.Stats(pr).sort_stats('cumtime') - #ps.print_stats() - #break -''' - Callbacks -''' -fcounter = 1 -def SerialTestJetsonToV5( params ): - global fcounter - print(f"Recieved: {fcounter}") - for param in range(len(params)): - print(params[param]) - fcounter += 1 - -def SerialTestV5ToJetson( params ): - global fcounter - print(f"Recieved: {fcounter}") - for param in range(len(params)): - print(params[param]) - fcounter += 1 -''' -End Callbacks -''' +mag_cnt = 0 +__mag_cnt_lck__ = threading.Lock() + +get_mag_estimate = lambda game_data, task_manager: game_data.GetMagCnt() + sum (1 for action in task_manager.queued_actions if action.type == Actions.Type.LOAD_DISK ) + +def GameLogic(camera: Vision, tasks: TaskManager, analyzer: Analyzer, objects: ObjectManager, scheduler: UpdateManager, comms: Communications): + t = time.time() + # Beat the game + while True: + + for object in camera.CollectObjects(): + objects.TraceObject(object) + + nt = time.time() + dt = nt - t + + if DEBUG_VISUALIZE: + pass # TODO: send data to Main Frame about the game state + # for bot in objects.GetBots(): + # update_robot_position_and_velocity(bot, dt) + + # buf = objects.SerializeForComs() + # comms.SendRawBuffer(buf) + + analyzer.WaitForCompletion() + + # Task -> Actions -> Steps -> Directions + # Reactive Program -> Reacts and responds its changing environment + + if tasks.AlmostFinished(): + decision = None + if get_mag_estimate(objects, tasks) >= 3: # We decide to shoot the disks + decision = Decision(objects.GetGoals()[GoalColor(CURRENT_TEAM_COLOR)], Actions.Type.SHOOT_DISK) + frisbees = objects.GetFrisbees() + closest_frisbee = min(frisbees, key= lambda obj: obj.distance) + if closest_frisbee.distance < 2.5: # Pick up the closest frisbee + decision = Decision(closest_frisbee, Actions.Type.LOAD_DISK) + else: + decision = analyzer.FindBest(objects, tasks) + + if objects.GetCurrentTarget() != decision.GetTarget(): + objects.SetCurrentTarget(decision.GetTarget()) + action = Actions(decision.target, decision.target_type, kwargs=decision.kwargs) + tasks.QueueTask(action) + else: + task = analyzer.TrackCompletion(objects, tasks.GetCurrentTask()) + if task.NeedsCorrection(): + tasks.Interupt() + tasks.QueueTask(task.GetSteps()) + # t = nt + + # pr.disable() + # ps = pstats.Stats(pr).sort_stats('cumtime') + # ps.print_stats() + # break + def Main() -> 0: + global fcounter, comms, scheduler, objects, analyzer, builder, tasks, camera + ''' + Callbacks + ''' + fcounter = 1 + def SerialTestJetsonToV5( params ): + pass + + def SerialTestV5ToJetson( params ): + global fcounter + print(f"Recieved: {fcounter}") + for param in range(len(params)): + print(params[param]) + fcounter += 1 + + def UpdateMagCount( params ): + global mag_cnt, __mag_cnt_lck__ + with __mag_cnt_lck__: + mag_cnt = params[0] + mg_copy = mag_cnt + objects.SetMagCnt(mg_copy) + + def UpdatePos( params ): + x = params[0] + y = params[1] + theta = params[2] + position = Vec(x, y, 0) + objects.UpdateRobot(position, theta) + + def OnTaskFinished( params ): + tasks.OnTaskFinished(task_number=params[0]) + + def NullUpdater( params ): + comms.SendPacket("null_callback", (0)) + + ''' + End Callbacks + ''' # global objects # Load serial Communications - # comms = Communications() - # comms.RegisterCallback("serial_test_v5_to_jetson", SerialTestV5ToJetson) - comms.RegisterCallback("serial_test_jetson_to_v5", SerialTestJetsonToV5) - # comms.Start() - # comms.WaitForTags() - camera = Vision(False) - camera.Start() - while True: - camera.CollectObjects() - - sent = 1 - ctime = time.time() - deltatime = 0 - ltime = ctime - while(True): - deltatime += (time.time() - ltime) * 1000 - ltime = time.time() - #print(deltatime) - if deltatime > 10: - deltatime = 0 - b = comms.SendPacket("TestFunction", 145.48, 541.785, "this is a long test string for testing sending string from the jetson to the v5 brain") - print(f"Sent: {sent}")#, int((ltime - ctime) * 10) / 10, b) - sent+=1 - time.sleep(0.005) - - # Init Game Object manager - # objects = ObjectManager() - + # camera = Vision(False) + # camera.Start() + # while True: + # camera.CollectObjects() + comms = Communications() + + if DEBUG_MODE: + sent = 1 + ctime = time.time() + deltatime = 0 + ltime = ctime + while(True): + deltatime += (time.time() - ltime) * 1000 + ltime = time.time() + #print(deltatime) + if deltatime > 25: + deltatime = 0 + b = comms.SendPacket("serial_test_jetson_to_v5", 3.14159, "Digits of PI:", "Today is the day we Win.") + # print(f"Sent: {sent}")#, int((ltime - ctime) * 10) / 10, b) + sent+=1 + time.sleep(0.005) + # Create Scheduler + scheduler = UpdateManager() + #Init Game Object manager + starting_vec = Vec(STARTING_POS[0], STARTING_POS[1], 0) + objects = ObjectManager(starting_pos=starting_vec, starting_angle= STARTING_ANGLE, comms=comms) # Init Game Analysis manager - # analyzer = Analyzer() - - # Init Game Step Builder - # builder = StepBuilder() + analyzer = Analyzer() # Init Task manager - # tasks = TaskManager() + tasks = TaskManager(object_manager=objects) # Init Vision manager with reporting to Game Analysis # True for fake data - # camera = Vision(DEBUG_VISUALIZE) - - # non_gui_thread = threading.Thread(target=GameLogic, args=(camera, tasks, analyzer)) + camera = None # Vision(DEBUG_VISUALIZE) - # # Start the non-GUI task in a separate thread - # non_gui_thread.start() + non_gui_thread = threading.Thread(target=GameLogic, args=(camera, tasks, analyzer, objects, scheduler, comms)) - # if DEBUG_VISUALIZE: - # app = QtWidgets.QApplication([]) - # visualize_game_objects(None) - # timer = QtCore.QTimer() - # timer.timeout.connect(visualization) - # timer.start(50) - # app.exec_() - - # GameLogic(camera, tasks, analyzer) + # Register callbacks + + if DEBUG_MODE: + comms.RegisterCallback("serial_test_v5_to_jetson", SerialTestV5ToJetson) + comms.RegisterCallback("serial_test_jetson_to_v5", SerialTestJetsonToV5) + else: + comms.RegisterCallback("update_mag_count", UpdateMagCount) + comms.RegisterCallback("task_finished", OnTaskFinished) + comms.RegisterCallback("get_pos", UpdatePos) + comms.Start() + comms.WaitForTags() + if comms.HasCallback("update_target_obj_dr_dtheta"): + scheduler.ScheduledUpdate(objects.CommsUpdateTargetDrDth, 0.05) + if comms.HasCallback("null_callback"): + scheduler.ScheduledUpdate(NullUpdater, 0.01) # Ensures the brain Does not get held up on CIN + + + # Start the non-GUI task in a separate thread + non_gui_thread.start() + + if DEBUG_VISUALIZE: + pass + # app = QtWidgets.QApplication([]) + # visualize_game_objects(None) + # timer = QtCore.QTimer() + # timer.timeout.connect(visualization) + # timer.start(50) + # app.exec_() + + # GameLogic(camera, tasks, analyzer, scheduler, comms) return 0 diff --git a/yolov5 b/yolov5 new file mode 160000 index 0000000..aa7c45c --- /dev/null +++ b/yolov5 @@ -0,0 +1 @@ +Subproject commit aa7c45c2cff71d498f42e9762c73cb9e5b0c699a From a4761f1dca530463297a8e02d2824aceaad7539e Mon Sep 17 00:00:00 2001 From: "DESKTOP-N9VDHAM\\John" Date: Thu, 27 Apr 2023 21:57:55 -0400 Subject: [PATCH 5/5] Adding handling Logic --- Managers/GameAnalysis.py | 49 ++++++++++++---------------------------- Managers/GameObjects.py | 4 ++-- Managers/GameTasks.py | 3 +-- globals.py | 3 ++- 4 files changed, 19 insertions(+), 40 deletions(-) diff --git a/Managers/GameAnalysis.py b/Managers/GameAnalysis.py index 76fd622..5e94e1e 100644 --- a/Managers/GameAnalysis.py +++ b/Managers/GameAnalysis.py @@ -11,6 +11,7 @@ class Analyzer: def __init__(self) -> None: self.__gameStartTime = time.time() * 1000 self.__hasher = None + self.__Turned = False pass # Finds the best decision # Add roller blocking @@ -39,43 +40,21 @@ def FindBest( self, game_data: ObjectManager, task_manager: TaskManager ) -> Dec hash_list.extend(hashedFrisbees) hash_list.extend(hashedRollers) - # actions = [ - # Actions.Type.CAPTURE_ROLLER, - # Actions.Type.DRIVE_TO_OBJECT, - # Actions.Type.LOAD_DISK, - # Actions.Type.SHOOT_DISK, - # Actions.Type.TURN_TO_ANGLE, - # Actions.Type.DRIVE_TO_OBJECT - # ] - - # def score_actions(frisbees: List[GameObject], rollers:List[GameObject], object_manager:ObjectManager, task_manager: TaskManager) -> List[Dict[str, Union[int, float]]]: - # dt = self.__GetDeltaTime() - # def CaptureRollerScore(frisbees: List[GameObject], rollers:List[GameObject], object_manager:ObjectManager, task_manager: TaskManager) -> List[Dict[str, Union[int, float]]]: - # kTime = 0.2 - # kDistance = 0.3 - # kOrientation = 0.3 - # cost = lambda dt, distance, orientation, roller: (((55 - (dt/1000) / 55) + kTime) * ((distance*kDistance + orientation*kOrientation) / 12) + int(math.inf*int(roller['color']) == CURRENT_TEAM_COLOR)) # Feet - - # def DriveToObjectScore(frisbees: List[GameObject], object_manager:ObjectManager, task_manager: TaskManager) -> List[Dict[str, Union[int, float]]]: - # kTime = 0.2 - # kDistance = 0.3 - # kOrientation = 0.3 - # kNum0bjs = 0.68 - # cost = lambda distance, orientation, dt, cnt_objs, : int((dt/1000)<45) * ((distance*kDistance + orientation*kOrientation) / 12) * kNum0bjs((3-cnt_objs)%4) - # for frisbee in frisbees: - # total_cost = cost() - # scored_items = for item.id: { - # "score": score, - # "object": item - # } - hash_list = sorted(hash_list, key=lambda x: x[next(iter(x.keys()))]['score'], reverse=True) - target_object = hash_list[0]['object'] - if target_object.type == ObjectType.FRISBEE: - return Decision(target_object, Actions.Type.DRIVE_TO_OBJECT) - elif target_object.type == ObjectType.ROLLER: - return Decision (target_object, Actions.Type.CAPTURE_ROLLER, kwargs= {'power': 0.35}) + # We have a target object + if hash_list: + target_object = hash_list[0]['object'] + if target_object.type == ObjectType.FRISBEE: + return Decision(target_object, Actions.Type.DRIVE_TO_OBJECT) + elif target_object.type == ObjectType.ROLLER: + return Decision (target_object, Actions.Type.CAPTURE_ROLLER, kwargs= {'power': 0.35}) + else: + # if self.__Turned: + # return Decision(None, Actions.Type.DRIVE_TO_TARGET, kwargs= {'r': 3.5, 'theta': 0, 'reversed': START_REVERSED}) + # else: + return Decision(None, Actions.Type.TURN_TO_ANGLE, kwargs= {'theta': 45, 'reversed': START_REVERSED}) + def __ThreadedHashing( self, objects: list, extra: dict) -> list: diff --git a/Managers/GameObjects.py b/Managers/GameObjects.py index ba25673..cb82df6 100644 --- a/Managers/GameObjects.py +++ b/Managers/GameObjects.py @@ -147,8 +147,8 @@ def ProcessImageData( self, image_data: List[List[float, float, ObjectType]]) -> print('Euc. Delta on Roller:', euclidean_distance) if euclidean_distance < min_euclidian_distance: id, min_euclidian_distance = i, euclidean_distance - if id != -1: - self.rollers[id] + if id != -1 and min_euclidian_distance < 60: # 5 Ft. + self.rollers[id].updatePos(Vec(interp_pos.x, interp_pos.y, 0)) self.PruneObjects() \ No newline at end of file diff --git a/Managers/GameTasks.py b/Managers/GameTasks.py index a0870b9..75ea046 100644 --- a/Managers/GameTasks.py +++ b/Managers/GameTasks.py @@ -152,11 +152,10 @@ def update( self, task_finished ): self.API_call_Sent = False elif self.action.isFinished is not None and self.sequence_position in self.action.seq_indexes_finished and self.action.isFinished(self.action): self.__API__Caller.SendClearTasks() - else: if not self.API_call_Sent: self.__API__Caller.MakeAPICall(self.__API_Calls[self.sequence_position], *self.__API_Args[self.sequence_position]) - + self.API_call_Sent = True return True if self.sequence_position >= self.sequence_length else False def UpdateTargetObject( self, target_object:GameObject ): diff --git a/globals.py b/globals.py index f830646..0968781 100644 --- a/globals.py +++ b/globals.py @@ -3,4 +3,5 @@ CURRENT_TEAM_COLOR= 0 # [0: RED, 1: BLUE] STARTING_POS = (0, 0) STARTING_ANGLE = 0.0 -HORIZONTAL_LIMIT = False \ No newline at end of file +HORIZONTAL_LIMIT = False +START_REVERSED = False \ No newline at end of file