utils.input
Controller input factory — config-driven managed input objects.
Public API::
from utils.input import InputFactory, get_factory
# In robotInit — create the factory (registers as active):
factory = InputFactory(config_path="data/inputs/controller.yaml")
# Anywhere else — fetch the active factory:
factory = get_factory()
button = factory.getButton("intake.run")
analog = factory.getAnalog("drivetrain.speed")
rumble = factory.getRumbleControl("general.rumble_left")
1"""Controller input factory — config-driven managed input objects. 2 3Public API:: 4 5 from utils.input import InputFactory, get_factory 6 7 # In robotInit — create the factory (registers as active): 8 factory = InputFactory(config_path="data/inputs/controller.yaml") 9 10 # Anywhere else — fetch the active factory: 11 factory = get_factory() 12 button = factory.getButton("intake.run") 13 analog = factory.getAnalog("drivetrain.speed") 14 rumble = factory.getRumbleControl("general.rumble_left") 15""" 16 17from utils.input.factory import InputFactory, get_factory 18from utils.input.managed_analog import ManagedAnalog 19from utils.input.managed_button import ManagedButton 20from utils.input.managed_rumble import ManagedRumble 21from utils.input.validation import ValidationIssue, validate_config 22 23__all__ = [ 24 "InputFactory", 25 "get_factory", 26 "ManagedAnalog", 27 "ManagedButton", 28 "ManagedRumble", 29 "ValidationIssue", 30 "validate_config", 31]
115class InputFactory: 116 """Config-driven controller input factory. 117 118 Loads configuration and creates managed input objects that wrap 119 wpilib HID devices with config-driven shaping, validation, and 120 NetworkTables publishing. 121 122 Args: 123 config: Pre-built FullConfig object (for testing). 124 config_path: Path to a single YAML with actions + controllers. 125 actions_path: Path to actions-only YAML (used with assignments_path). 126 assignments_path: Path to controllers-only YAML. 127 register_global: Controls singleton registration for get_factory(). 128 None (default) — register only if no factory exists yet 129 (first-created wins). True — always register, replacing any 130 existing factory. False — never register (standalone instance, 131 invisible to get_factory()). 132 133 Precedence: config > config_path > (actions_path + assignments_path) 134 """ 135 136 def __init__( 137 self, 138 config: FullConfig | None = None, 139 config_path: str | Path | None = None, 140 actions_path: str | Path | None = None, 141 assignments_path: str | Path | None = None, 142 register_global: bool | None = None, 143 ): 144 # Load configuration and track source for error messages 145 # Resolve relative paths against the project root (two levels up from 146 # utils/input/) so callers don't depend on cwd. 147 _project_root = Path(__file__).resolve().parent.parent.parent 148 if config is not None: 149 self._config = config 150 self._config_source = "<FullConfig object>" 151 self._config_files: list[Path] = [] 152 elif config_path is not None: 153 p = Path(config_path) 154 if not p.is_absolute(): 155 p = _project_root / p 156 self._config = load_config(p) 157 self._config_source = str(config_path) 158 self._config_files = [p.resolve()] 159 elif actions_path is not None and assignments_path is not None: 160 actions = load_actions_from_file(actions_path) 161 controllers = load_assignments_from_file(assignments_path) 162 self._config = FullConfig( 163 actions=actions, controllers=controllers) 164 self._config_source = ( 165 f"{actions_path} + {assignments_path}") 166 self._config_files = [ 167 Path(actions_path).resolve(), 168 Path(assignments_path).resolve()] 169 else: 170 raise ValueError( 171 "Must provide config, config_path, or " 172 "(actions_path + assignments_path)") 173 174 # Create controller instances 175 self._controllers: dict[int, ControllerState] = {} 176 for port, ctrl_config in self._config.controllers.items(): 177 self._controllers[port] = ControllerState(port, ctrl_config) 178 179 # Publish raw controllers to NT for dashboard inspection. 180 # Only publish if connected — avoids warning spam in sim 181 # when no joystick is plugged in (Sendable queries axes). 182 for port, state in self._controllers.items(): 183 if wpilib.DriverStation.isJoystickConnected(port): 184 wpilib.SmartDashboard.putData( 185 f"{_NT_BASE}/controllers/raw/{port}", 186 state.controller) 187 188 # Publish bindings info and config metadata to NT 189 publish_bindings_nt(_NT_BASE, self._controllers) 190 publish_config_metadata(_NT_BASE, self._config_files) 191 192 # Caches — all factory methods return the same object for a name 193 self._buttons: dict[str, ManagedButton] = {} 194 self._analogs: dict[str, ManagedAnalog] = {} 195 self._rumbles: dict[str, ManagedRumble] = {} 196 self._raw_buttons: dict[str, Callable[[], bool]] = {} 197 self._raw_analogs: dict[tuple, Callable[[], float]] = {} 198 self._va_generators: list[VirtualAnalogGenerator] = [] 199 200 # Register as the active factory for get_factory(). 201 # register_global=None (default): register only if no factory exists yet. 202 # register_global=True: always register (override existing). 203 # register_global=False: never register (standalone factory). 204 global _active_factory 205 if register_global is True: 206 _active_factory = self 207 elif register_global is None and _active_factory is None: 208 _active_factory = self 209 210 # Register an internal subsystem that syncs NT values each cycle. 211 # The CommandScheduler iterates subsystems in registration order 212 # (dict insertion order in Python 3.7+). Since the factory is 213 # created in robotInit before user subsystems, this updater is 214 # registered first and its periodic() runs before all others. 215 scheduler = commands2.CommandScheduler.getInstance() 216 if scheduler._subsystems: 217 log.warning( 218 "InputFactory created after %d other subsystem(s) — " 219 "NT sync may run after those subsystems read stale " 220 "values for one cycle. Create the factory before any " 221 "subsystems to guarantee ordering.", 222 len(scheduler._subsystems)) 223 self._updater = _FactoryUpdater(self) 224 225 # Eager action creation — pre-create all managed objects so their 226 # NT entries are published immediately. This lets the dashboard 227 # inspect and tune every action's parameters before robot code 228 # requests them. Uses required=False so unbound actions get 229 # graceful defaults instead of raising. 230 self._eager_init_active = True 231 for qn, action in self._config.actions.items(): 232 try: 233 if action.input_type in ( 234 InputType.BUTTON, InputType.BOOLEAN_TRIGGER 235 ): 236 self.getButton(qn, required=False) 237 elif action.input_type in ( 238 InputType.ANALOG, InputType.VIRTUAL_ANALOG 239 ): 240 self.getAnalog(qn, required=False) 241 elif action.input_type == InputType.OUTPUT: 242 self.getRumbleControl(qn, required=False) 243 except Exception: 244 log.warning( 245 "Eager creation failed for '%s', will retry on " 246 "first get*() call", qn, exc_info=True) 247 self._eager_init_active = False 248 249 @property 250 def config(self) -> FullConfig: 251 """The loaded configuration.""" 252 return self._config 253 254 # --- Name resolution --- 255 256 def _resolve_name(self, name: str, group: str | None) -> str: 257 """Resolve a name to a fully qualified action name. 258 259 - "group.name" -> "group.name" (dot present, group param ignored) 260 - ("name", group="intake") -> "intake.name" 261 - ("name", group=None) -> "general.name" 262 """ 263 if '.' in name: 264 return name 265 g = group if group is not None else "general" 266 return f"{g}.{name}" 267 268 def _find_binding(self, qualified_name: str 269 ) -> tuple[ControllerState, str] | None: 270 """Find which controller+input is bound to the given action. 271 272 Returns (controller_state, input_name) or None if not bound. 273 """ 274 for state in self._controllers.values(): 275 if qualified_name in state.action_to_input: 276 return state, state.action_to_input[qualified_name] 277 return None 278 279 def _mark_in_use(self, obj) -> None: 280 """Mark a managed object as in-use (called outside eager init).""" 281 if not self._eager_init_active and hasattr(obj, 'nt_in_use'): 282 obj.nt_in_use = True 283 284 # --- Factory methods --- 285 286 def getButton( 287 self, 288 name: str, 289 group: str | None = None, 290 required: bool = True, 291 default_value: bool = False, 292 ) -> ManagedButton: 293 """Get a managed button for the named action. 294 295 Handles InputType.BUTTON, POV, and BOOLEAN_TRIGGER. 296 297 Args: 298 name: Action name — either qualified "group.name" or short 299 "name". If no dot is present and group is None, the 300 "general" group is assumed (e.g. "fire" -> "general.fire"). 301 group: Explicit group override (used when name has no dot). 302 required: If True, raise KeyError when not found/not bound. 303 default_value: Value returned if unbound. 304 305 Returns: 306 ManagedButton wrapping a Trigger with the configured binding. 307 """ 308 qn = self._resolve_name(name, group) 309 310 # Return cached if exists 311 if qn in self._buttons: 312 self._mark_in_use(self._buttons[qn]) 313 return self._buttons[qn] 314 315 action = self._config.actions.get(qn) 316 binding = self._find_binding(qn) 317 318 if action is None: 319 if required: 320 raise KeyError( 321 f"Action '{qn}' not found in config " 322 f"({self._config_source})") 323 log.warning("Action '%s' not found, returning default", qn) 324 btn = ManagedButton(None, lambda: default_value, default_value) 325 self._buttons[qn] = btn 326 return btn 327 328 if binding is None: 329 if required: 330 raise KeyError( 331 f"Action '{qn}' exists but is not bound to any " 332 f"input ({self._config_source})") 333 log.warning("Action '%s' not bound, returning default", qn) 334 btn = ManagedButton( 335 action, lambda: default_value, default_value) 336 self._buttons[qn] = btn 337 return btn 338 339 state, input_name = binding 340 341 # For BOOLEAN_TRIGGER, create a shared mutable threshold ref 342 # so the condition closure reads the latest value each cycle 343 # without needing to rebuild the Trigger. 344 threshold_ref = None 345 if action.input_type == InputType.BOOLEAN_TRIGGER: 346 threshold_ref = [action.threshold] 347 condition = make_button_condition( 348 state, input_name, action, threshold_ref=threshold_ref) 349 350 # Create NT-enabled subclass 351 nt_path = f"{_NT_BASE}/actions/{action.group}/{action.name}" 352 klass = make_button_nt_class(nt_path, action) 353 btn = klass(action, condition, default_value) 354 btn._binding_info = format_binding_info(state, input_name) 355 if threshold_ref is not None: 356 btn._threshold_ref = threshold_ref 357 self._buttons[qn] = btn 358 self._mark_in_use(btn) 359 return btn 360 361 def getRawButton( 362 self, 363 name: str, 364 group: str | None = None, 365 required: bool = True, 366 ) -> Callable[[], bool]: 367 """Get a plain callable returning the button state. 368 369 No Trigger wrapping, no command binding — just a function. 370 For BOOLEAN_TRIGGER: returns lambda applying threshold comparison. 371 Results are cached — same name returns the same callable. 372 373 Args: 374 name: Action name — either qualified "group.name" or short 375 "name". If no dot is present and group is None, the 376 "general" group is assumed (e.g. "fire" -> "general.fire"). 377 group: Explicit group override (used when name has no dot). 378 required: If True, raise KeyError when not found/not bound. 379 """ 380 qn = self._resolve_name(name, group) 381 382 if qn in self._raw_buttons: 383 return self._raw_buttons[qn] 384 385 action = self._config.actions.get(qn) 386 binding = self._find_binding(qn) 387 388 if action is None or binding is None: 389 if required: 390 raise KeyError( 391 f"Action '{qn}' not found or not bound " 392 f"({self._config_source})") 393 log.warning("Action '%s' unavailable, returning False", qn) 394 fn = lambda: False 395 self._raw_buttons[qn] = fn 396 return fn 397 398 state, input_name = binding 399 fn = make_button_condition(state, input_name, action) 400 self._raw_buttons[qn] = fn 401 return fn 402 403 def getAnalog( 404 self, 405 name: str, 406 group: str | None = None, 407 required: bool = True, 408 default_value: float = 0.0, 409 ) -> ManagedAnalog: 410 """Get a managed analog input for the named action. 411 412 Handles InputType.ANALOG. Full shaping pipeline from config. 413 414 Args: 415 name: Action name — either qualified "group.name" or short 416 "name". If no dot is present and group is None, the 417 "general" group is assumed (e.g. "speed" -> "general.speed"). 418 group: Explicit group override (used when name has no dot). 419 required: If True, raise KeyError when not found/not bound. 420 default_value: Value returned if unbound. 421 422 Returns: 423 ManagedAnalog — callable returning shaped value. 424 """ 425 qn = self._resolve_name(name, group) 426 427 if qn in self._analogs: 428 self._mark_in_use(self._analogs[qn]) 429 return self._analogs[qn] 430 431 action = self._config.actions.get(qn) 432 binding = self._find_binding(qn) 433 434 if action is None: 435 if required: 436 raise KeyError( 437 f"Action '{qn}' not found in config " 438 f"({self._config_source})") 439 log.warning("Action '%s' not found, returning default", qn) 440 analog = ManagedAnalog( 441 None, lambda: default_value, default_value) 442 self._analogs[qn] = analog 443 return analog 444 445 if action.input_type == InputType.VIRTUAL_ANALOG: 446 if binding is None: 447 if not required: 448 log.warning( 449 "Action '%s' is VIRTUAL_ANALOG but not bound, " 450 "returning default", qn) 451 analog = ManagedAnalog( 452 action, lambda: default_value, default_value) 453 self._analogs[qn] = analog 454 return analog 455 raise KeyError( 456 f"VIRTUAL_ANALOG action '{qn}' exists but is not " 457 f"bound to any input ({self._config_source})") 458 state, input_name = binding 459 button_fn = make_button_condition(state, input_name, action) 460 generator = VirtualAnalogGenerator(action, button_fn) 461 self._va_generators.append(generator) 462 nt_path = f"{_NT_BASE}/actions/{action.group}/{action.name}" 463 klass = make_analog_nt_class(nt_path, action) 464 analog = klass(action, generator.get_value, default_value) 465 analog._binding_info = format_binding_info(state, input_name) 466 self._analogs[qn] = analog 467 self._mark_in_use(analog) 468 return analog 469 470 if binding is None: 471 if required: 472 raise KeyError( 473 f"Action '{qn}' exists but is not bound to any " 474 f"input ({self._config_source})") 475 log.warning("Action '%s' not bound, returning default", qn) 476 analog = ManagedAnalog( 477 action, lambda: default_value, default_value) 478 self._analogs[qn] = analog 479 return analog 480 481 state, input_name = binding 482 accessor = make_axis_accessor(state, input_name) 483 484 # Create NT-enabled subclass 485 nt_path = f"{_NT_BASE}/actions/{action.group}/{action.name}" 486 klass = make_analog_nt_class(nt_path, action) 487 analog = klass(action, accessor, default_value) 488 analog._binding_info = format_binding_info(state, input_name) 489 self._analogs[qn] = analog 490 self._mark_in_use(analog) 491 return analog 492 493 def getAnalogRaw( 494 self, 495 name: str, 496 group: str | None = None, 497 required: bool = True, 498 apply_invert: bool = False, 499 apply_deadband: bool = False, 500 apply_scale: bool = False, 501 ) -> Callable[[], float]: 502 """Get a callable returning a selectively-shaped axis value. 503 504 Only applies the requested transformations. Does not create 505 a managed object — returns a plain callable. 506 Results are cached by (name, apply_invert, apply_deadband, 507 apply_scale) — same args return the same callable. 508 509 Args: 510 name: Action name — either qualified "group.name" or short 511 "name". If no dot is present and group is None, the 512 "general" group is assumed. 513 group: Explicit group override (used when name has no dot). 514 required: If True, raise KeyError when unavailable. 515 apply_invert: Apply inversion from config. 516 apply_deadband: Apply deadband from config. 517 apply_scale: Apply scale from config. 518 """ 519 qn = self._resolve_name(name, group) 520 cache_key = (qn, apply_invert, apply_deadband, apply_scale) 521 522 if cache_key in self._raw_analogs: 523 return self._raw_analogs[cache_key] 524 525 action = self._config.actions.get(qn) 526 binding = self._find_binding(qn) 527 528 if action is None or binding is None: 529 if required: 530 raise KeyError( 531 f"Action '{qn}' not found or not bound " 532 f"({self._config_source})") 533 log.warning("Action '%s' unavailable, returning 0.0", qn) 534 fn = lambda: 0.0 535 self._raw_analogs[cache_key] = fn 536 return fn 537 538 state, input_name = binding 539 raw_accessor = make_axis_accessor(state, input_name) 540 541 # Build a selective pipeline using SCALED mode 542 # (applies only the requested transformations) 543 pipeline = build_shaping_pipeline( 544 inversion=action.inversion if apply_invert else False, 545 deadband=action.deadband if apply_deadband else 0.0, 546 trigger_mode=EventTriggerMode.SCALED, 547 scale=action.scale if apply_scale else 1.0, 548 extra={}, 549 action_name=qn, 550 ) 551 552 def _selective(): 553 return pipeline(raw_accessor()) 554 self._raw_analogs[cache_key] = _selective 555 return _selective 556 557 def getRumbleControl( 558 self, 559 name: str, 560 group: str | None = None, 561 required: bool = True, 562 ) -> ManagedRumble: 563 """Get a managed rumble output for the named action. 564 565 Args: 566 name: Action name — either qualified "group.name" or short 567 "name". If no dot is present and group is None, the 568 "general" group is assumed. 569 group: Explicit group override (used when name has no dot). 570 required: If True, raise KeyError when unavailable. 571 572 Returns: 573 ManagedRumble with set(value, timeout) interface. 574 """ 575 qn = self._resolve_name(name, group) 576 577 if qn in self._rumbles: 578 self._mark_in_use(self._rumbles[qn]) 579 return self._rumbles[qn] 580 581 action = self._config.actions.get(qn) 582 binding = self._find_binding(qn) 583 584 if action is None: 585 if required: 586 raise KeyError( 587 f"Action '{qn}' not found in config " 588 f"({self._config_source})") 589 log.warning("Action '%s' not found, returning no-op rumble", qn) 590 rumble = ManagedRumble(None, lambda v: None) 591 self._rumbles[qn] = rumble 592 return rumble 593 594 if binding is None: 595 if required: 596 raise KeyError( 597 f"Action '{qn}' exists but is not bound to any " 598 f"input ({self._config_source})") 599 log.warning("Action '%s' not bound, returning no-op rumble", qn) 600 rumble = ManagedRumble(action, lambda v: None) 601 self._rumbles[qn] = rumble 602 return rumble 603 604 state, input_name = binding 605 setter = make_output_setter(state, input_name) 606 607 # Create NT-enabled subclass 608 nt_path = f"{_NT_BASE}/actions/{action.group}/{action.name}" 609 klass = make_rumble_nt_class(nt_path, action) 610 rumble = klass(action, setter) 611 rumble._binding_info = format_binding_info(state, input_name) 612 self._rumbles[qn] = rumble 613 self._mark_in_use(rumble) 614 return rumble 615 616 # --- Controller access --- 617 618 def getController(self, port: int) -> wpilib.XboxController | None: 619 """Get the raw XboxController for a given port (for telemetry/logging).""" 620 state = self._controllers.get(port) 621 return state.controller if state is not None else None 622 623 # --- Periodic sync (called automatically by _FactoryUpdater) --- 624 625 def _update(self) -> None: 626 """Sync NT values into managed objects and handle rumble timeouts. 627 628 Called automatically each scheduler cycle by ``_FactoryUpdater``. 629 Because the factory registers before user subsystems, this runs 630 first — so managed inputs have fresh NT values before any 631 subsystem reads them. 632 633 NT values are read once per cycle and compared against cached 634 local values. If a dashboard change is detected, the local 635 property is updated and the pipeline is rebuilt. This prevents 636 mid-cycle inconsistency — all reads within a single cycle see 637 the same parameter snapshot. 638 639 Custom NT mappings (from ``mapParamToNtPath()``) are synced 640 after the auto-generated NT sync. Parameters with custom 641 mappings are skipped during auto-generated sync to prevent 642 conflicts. 643 644 All NT reads (both auto-generated and custom) happen here in 645 the main robot loop — not via ntcore listeners — because 646 listener callbacks fire on a background thread and would race 647 with property reads during pipeline execution. 648 """ 649 # Sync NT -> ManagedAnalog properties 650 for analog in self._analogs.values(): 651 if analog.action is None: 652 continue 653 sync_analog_nt(analog) 654 analog._sync_custom_maps() 655 656 # Sync NT -> ManagedButton properties (BOOLEAN_TRIGGER threshold) 657 for btn in self._buttons.values(): 658 if btn.action is None: 659 continue 660 sync_button_nt(btn) 661 btn._sync_custom_maps() 662 663 # Update rumble timeouts 664 for rumble in self._rumbles.values(): 665 rumble.update() 666 667 # Update virtual analog generators 668 for gen in self._va_generators: 669 gen.update() 670 671 # --- Future: dynamic remapping --- 672 673 def remap(self, action_name: str, port: int, 674 input_name: str) -> None: 675 """Swap the physical input for a named action. 676 677 Not implemented — reserved for future dynamic remapping. 678 """ 679 raise NotImplementedError("Dynamic remapping not yet implemented")
Config-driven controller input factory.
Loads configuration and creates managed input objects that wrap wpilib HID devices with config-driven shaping, validation, and NetworkTables publishing.
Args: config: Pre-built FullConfig object (for testing). config_path: Path to a single YAML with actions + controllers. actions_path: Path to actions-only YAML (used with assignments_path). assignments_path: Path to controllers-only YAML. register_global: Controls singleton registration for get_factory(). None (default) — register only if no factory exists yet (first-created wins). True — always register, replacing any existing factory. False — never register (standalone instance, invisible to get_factory()).
Precedence: config > config_path > (actions_path + assignments_path)
136 def __init__( 137 self, 138 config: FullConfig | None = None, 139 config_path: str | Path | None = None, 140 actions_path: str | Path | None = None, 141 assignments_path: str | Path | None = None, 142 register_global: bool | None = None, 143 ): 144 # Load configuration and track source for error messages 145 # Resolve relative paths against the project root (two levels up from 146 # utils/input/) so callers don't depend on cwd. 147 _project_root = Path(__file__).resolve().parent.parent.parent 148 if config is not None: 149 self._config = config 150 self._config_source = "<FullConfig object>" 151 self._config_files: list[Path] = [] 152 elif config_path is not None: 153 p = Path(config_path) 154 if not p.is_absolute(): 155 p = _project_root / p 156 self._config = load_config(p) 157 self._config_source = str(config_path) 158 self._config_files = [p.resolve()] 159 elif actions_path is not None and assignments_path is not None: 160 actions = load_actions_from_file(actions_path) 161 controllers = load_assignments_from_file(assignments_path) 162 self._config = FullConfig( 163 actions=actions, controllers=controllers) 164 self._config_source = ( 165 f"{actions_path} + {assignments_path}") 166 self._config_files = [ 167 Path(actions_path).resolve(), 168 Path(assignments_path).resolve()] 169 else: 170 raise ValueError( 171 "Must provide config, config_path, or " 172 "(actions_path + assignments_path)") 173 174 # Create controller instances 175 self._controllers: dict[int, ControllerState] = {} 176 for port, ctrl_config in self._config.controllers.items(): 177 self._controllers[port] = ControllerState(port, ctrl_config) 178 179 # Publish raw controllers to NT for dashboard inspection. 180 # Only publish if connected — avoids warning spam in sim 181 # when no joystick is plugged in (Sendable queries axes). 182 for port, state in self._controllers.items(): 183 if wpilib.DriverStation.isJoystickConnected(port): 184 wpilib.SmartDashboard.putData( 185 f"{_NT_BASE}/controllers/raw/{port}", 186 state.controller) 187 188 # Publish bindings info and config metadata to NT 189 publish_bindings_nt(_NT_BASE, self._controllers) 190 publish_config_metadata(_NT_BASE, self._config_files) 191 192 # Caches — all factory methods return the same object for a name 193 self._buttons: dict[str, ManagedButton] = {} 194 self._analogs: dict[str, ManagedAnalog] = {} 195 self._rumbles: dict[str, ManagedRumble] = {} 196 self._raw_buttons: dict[str, Callable[[], bool]] = {} 197 self._raw_analogs: dict[tuple, Callable[[], float]] = {} 198 self._va_generators: list[VirtualAnalogGenerator] = [] 199 200 # Register as the active factory for get_factory(). 201 # register_global=None (default): register only if no factory exists yet. 202 # register_global=True: always register (override existing). 203 # register_global=False: never register (standalone factory). 204 global _active_factory 205 if register_global is True: 206 _active_factory = self 207 elif register_global is None and _active_factory is None: 208 _active_factory = self 209 210 # Register an internal subsystem that syncs NT values each cycle. 211 # The CommandScheduler iterates subsystems in registration order 212 # (dict insertion order in Python 3.7+). Since the factory is 213 # created in robotInit before user subsystems, this updater is 214 # registered first and its periodic() runs before all others. 215 scheduler = commands2.CommandScheduler.getInstance() 216 if scheduler._subsystems: 217 log.warning( 218 "InputFactory created after %d other subsystem(s) — " 219 "NT sync may run after those subsystems read stale " 220 "values for one cycle. Create the factory before any " 221 "subsystems to guarantee ordering.", 222 len(scheduler._subsystems)) 223 self._updater = _FactoryUpdater(self) 224 225 # Eager action creation — pre-create all managed objects so their 226 # NT entries are published immediately. This lets the dashboard 227 # inspect and tune every action's parameters before robot code 228 # requests them. Uses required=False so unbound actions get 229 # graceful defaults instead of raising. 230 self._eager_init_active = True 231 for qn, action in self._config.actions.items(): 232 try: 233 if action.input_type in ( 234 InputType.BUTTON, InputType.BOOLEAN_TRIGGER 235 ): 236 self.getButton(qn, required=False) 237 elif action.input_type in ( 238 InputType.ANALOG, InputType.VIRTUAL_ANALOG 239 ): 240 self.getAnalog(qn, required=False) 241 elif action.input_type == InputType.OUTPUT: 242 self.getRumbleControl(qn, required=False) 243 except Exception: 244 log.warning( 245 "Eager creation failed for '%s', will retry on " 246 "first get*() call", qn, exc_info=True) 247 self._eager_init_active = False
249 @property 250 def config(self) -> FullConfig: 251 """The loaded configuration.""" 252 return self._config
The loaded configuration.
286 def getButton( 287 self, 288 name: str, 289 group: str | None = None, 290 required: bool = True, 291 default_value: bool = False, 292 ) -> ManagedButton: 293 """Get a managed button for the named action. 294 295 Handles InputType.BUTTON, POV, and BOOLEAN_TRIGGER. 296 297 Args: 298 name: Action name — either qualified "group.name" or short 299 "name". If no dot is present and group is None, the 300 "general" group is assumed (e.g. "fire" -> "general.fire"). 301 group: Explicit group override (used when name has no dot). 302 required: If True, raise KeyError when not found/not bound. 303 default_value: Value returned if unbound. 304 305 Returns: 306 ManagedButton wrapping a Trigger with the configured binding. 307 """ 308 qn = self._resolve_name(name, group) 309 310 # Return cached if exists 311 if qn in self._buttons: 312 self._mark_in_use(self._buttons[qn]) 313 return self._buttons[qn] 314 315 action = self._config.actions.get(qn) 316 binding = self._find_binding(qn) 317 318 if action is None: 319 if required: 320 raise KeyError( 321 f"Action '{qn}' not found in config " 322 f"({self._config_source})") 323 log.warning("Action '%s' not found, returning default", qn) 324 btn = ManagedButton(None, lambda: default_value, default_value) 325 self._buttons[qn] = btn 326 return btn 327 328 if binding is None: 329 if required: 330 raise KeyError( 331 f"Action '{qn}' exists but is not bound to any " 332 f"input ({self._config_source})") 333 log.warning("Action '%s' not bound, returning default", qn) 334 btn = ManagedButton( 335 action, lambda: default_value, default_value) 336 self._buttons[qn] = btn 337 return btn 338 339 state, input_name = binding 340 341 # For BOOLEAN_TRIGGER, create a shared mutable threshold ref 342 # so the condition closure reads the latest value each cycle 343 # without needing to rebuild the Trigger. 344 threshold_ref = None 345 if action.input_type == InputType.BOOLEAN_TRIGGER: 346 threshold_ref = [action.threshold] 347 condition = make_button_condition( 348 state, input_name, action, threshold_ref=threshold_ref) 349 350 # Create NT-enabled subclass 351 nt_path = f"{_NT_BASE}/actions/{action.group}/{action.name}" 352 klass = make_button_nt_class(nt_path, action) 353 btn = klass(action, condition, default_value) 354 btn._binding_info = format_binding_info(state, input_name) 355 if threshold_ref is not None: 356 btn._threshold_ref = threshold_ref 357 self._buttons[qn] = btn 358 self._mark_in_use(btn) 359 return btn
Get a managed button for the named action.
Handles InputType.BUTTON, POV, and BOOLEAN_TRIGGER.
Args: name: Action name — either qualified "group.name" or short "name". If no dot is present and group is None, the "general" group is assumed (e.g. "fire" -> "general.fire"). group: Explicit group override (used when name has no dot). required: If True, raise KeyError when not found/not bound. default_value: Value returned if unbound.
Returns: ManagedButton wrapping a Trigger with the configured binding.
361 def getRawButton( 362 self, 363 name: str, 364 group: str | None = None, 365 required: bool = True, 366 ) -> Callable[[], bool]: 367 """Get a plain callable returning the button state. 368 369 No Trigger wrapping, no command binding — just a function. 370 For BOOLEAN_TRIGGER: returns lambda applying threshold comparison. 371 Results are cached — same name returns the same callable. 372 373 Args: 374 name: Action name — either qualified "group.name" or short 375 "name". If no dot is present and group is None, the 376 "general" group is assumed (e.g. "fire" -> "general.fire"). 377 group: Explicit group override (used when name has no dot). 378 required: If True, raise KeyError when not found/not bound. 379 """ 380 qn = self._resolve_name(name, group) 381 382 if qn in self._raw_buttons: 383 return self._raw_buttons[qn] 384 385 action = self._config.actions.get(qn) 386 binding = self._find_binding(qn) 387 388 if action is None or binding is None: 389 if required: 390 raise KeyError( 391 f"Action '{qn}' not found or not bound " 392 f"({self._config_source})") 393 log.warning("Action '%s' unavailable, returning False", qn) 394 fn = lambda: False 395 self._raw_buttons[qn] = fn 396 return fn 397 398 state, input_name = binding 399 fn = make_button_condition(state, input_name, action) 400 self._raw_buttons[qn] = fn 401 return fn
Get a plain callable returning the button state.
No Trigger wrapping, no command binding — just a function. For BOOLEAN_TRIGGER: returns lambda applying threshold comparison. Results are cached — same name returns the same callable.
Args: name: Action name — either qualified "group.name" or short "name". If no dot is present and group is None, the "general" group is assumed (e.g. "fire" -> "general.fire"). group: Explicit group override (used when name has no dot). required: If True, raise KeyError when not found/not bound.
403 def getAnalog( 404 self, 405 name: str, 406 group: str | None = None, 407 required: bool = True, 408 default_value: float = 0.0, 409 ) -> ManagedAnalog: 410 """Get a managed analog input for the named action. 411 412 Handles InputType.ANALOG. Full shaping pipeline from config. 413 414 Args: 415 name: Action name — either qualified "group.name" or short 416 "name". If no dot is present and group is None, the 417 "general" group is assumed (e.g. "speed" -> "general.speed"). 418 group: Explicit group override (used when name has no dot). 419 required: If True, raise KeyError when not found/not bound. 420 default_value: Value returned if unbound. 421 422 Returns: 423 ManagedAnalog — callable returning shaped value. 424 """ 425 qn = self._resolve_name(name, group) 426 427 if qn in self._analogs: 428 self._mark_in_use(self._analogs[qn]) 429 return self._analogs[qn] 430 431 action = self._config.actions.get(qn) 432 binding = self._find_binding(qn) 433 434 if action is None: 435 if required: 436 raise KeyError( 437 f"Action '{qn}' not found in config " 438 f"({self._config_source})") 439 log.warning("Action '%s' not found, returning default", qn) 440 analog = ManagedAnalog( 441 None, lambda: default_value, default_value) 442 self._analogs[qn] = analog 443 return analog 444 445 if action.input_type == InputType.VIRTUAL_ANALOG: 446 if binding is None: 447 if not required: 448 log.warning( 449 "Action '%s' is VIRTUAL_ANALOG but not bound, " 450 "returning default", qn) 451 analog = ManagedAnalog( 452 action, lambda: default_value, default_value) 453 self._analogs[qn] = analog 454 return analog 455 raise KeyError( 456 f"VIRTUAL_ANALOG action '{qn}' exists but is not " 457 f"bound to any input ({self._config_source})") 458 state, input_name = binding 459 button_fn = make_button_condition(state, input_name, action) 460 generator = VirtualAnalogGenerator(action, button_fn) 461 self._va_generators.append(generator) 462 nt_path = f"{_NT_BASE}/actions/{action.group}/{action.name}" 463 klass = make_analog_nt_class(nt_path, action) 464 analog = klass(action, generator.get_value, default_value) 465 analog._binding_info = format_binding_info(state, input_name) 466 self._analogs[qn] = analog 467 self._mark_in_use(analog) 468 return analog 469 470 if binding is None: 471 if required: 472 raise KeyError( 473 f"Action '{qn}' exists but is not bound to any " 474 f"input ({self._config_source})") 475 log.warning("Action '%s' not bound, returning default", qn) 476 analog = ManagedAnalog( 477 action, lambda: default_value, default_value) 478 self._analogs[qn] = analog 479 return analog 480 481 state, input_name = binding 482 accessor = make_axis_accessor(state, input_name) 483 484 # Create NT-enabled subclass 485 nt_path = f"{_NT_BASE}/actions/{action.group}/{action.name}" 486 klass = make_analog_nt_class(nt_path, action) 487 analog = klass(action, accessor, default_value) 488 analog._binding_info = format_binding_info(state, input_name) 489 self._analogs[qn] = analog 490 self._mark_in_use(analog) 491 return analog
Get a managed analog input for the named action.
Handles InputType.ANALOG. Full shaping pipeline from config.
Args: name: Action name — either qualified "group.name" or short "name". If no dot is present and group is None, the "general" group is assumed (e.g. "speed" -> "general.speed"). group: Explicit group override (used when name has no dot). required: If True, raise KeyError when not found/not bound. default_value: Value returned if unbound.
Returns: ManagedAnalog — callable returning shaped value.
493 def getAnalogRaw( 494 self, 495 name: str, 496 group: str | None = None, 497 required: bool = True, 498 apply_invert: bool = False, 499 apply_deadband: bool = False, 500 apply_scale: bool = False, 501 ) -> Callable[[], float]: 502 """Get a callable returning a selectively-shaped axis value. 503 504 Only applies the requested transformations. Does not create 505 a managed object — returns a plain callable. 506 Results are cached by (name, apply_invert, apply_deadband, 507 apply_scale) — same args return the same callable. 508 509 Args: 510 name: Action name — either qualified "group.name" or short 511 "name". If no dot is present and group is None, the 512 "general" group is assumed. 513 group: Explicit group override (used when name has no dot). 514 required: If True, raise KeyError when unavailable. 515 apply_invert: Apply inversion from config. 516 apply_deadband: Apply deadband from config. 517 apply_scale: Apply scale from config. 518 """ 519 qn = self._resolve_name(name, group) 520 cache_key = (qn, apply_invert, apply_deadband, apply_scale) 521 522 if cache_key in self._raw_analogs: 523 return self._raw_analogs[cache_key] 524 525 action = self._config.actions.get(qn) 526 binding = self._find_binding(qn) 527 528 if action is None or binding is None: 529 if required: 530 raise KeyError( 531 f"Action '{qn}' not found or not bound " 532 f"({self._config_source})") 533 log.warning("Action '%s' unavailable, returning 0.0", qn) 534 fn = lambda: 0.0 535 self._raw_analogs[cache_key] = fn 536 return fn 537 538 state, input_name = binding 539 raw_accessor = make_axis_accessor(state, input_name) 540 541 # Build a selective pipeline using SCALED mode 542 # (applies only the requested transformations) 543 pipeline = build_shaping_pipeline( 544 inversion=action.inversion if apply_invert else False, 545 deadband=action.deadband if apply_deadband else 0.0, 546 trigger_mode=EventTriggerMode.SCALED, 547 scale=action.scale if apply_scale else 1.0, 548 extra={}, 549 action_name=qn, 550 ) 551 552 def _selective(): 553 return pipeline(raw_accessor()) 554 self._raw_analogs[cache_key] = _selective 555 return _selective
Get a callable returning a selectively-shaped axis value.
Only applies the requested transformations. Does not create a managed object — returns a plain callable. Results are cached by (name, apply_invert, apply_deadband, apply_scale) — same args return the same callable.
Args: name: Action name — either qualified "group.name" or short "name". If no dot is present and group is None, the "general" group is assumed. group: Explicit group override (used when name has no dot). required: If True, raise KeyError when unavailable. apply_invert: Apply inversion from config. apply_deadband: Apply deadband from config. apply_scale: Apply scale from config.
557 def getRumbleControl( 558 self, 559 name: str, 560 group: str | None = None, 561 required: bool = True, 562 ) -> ManagedRumble: 563 """Get a managed rumble output for the named action. 564 565 Args: 566 name: Action name — either qualified "group.name" or short 567 "name". If no dot is present and group is None, the 568 "general" group is assumed. 569 group: Explicit group override (used when name has no dot). 570 required: If True, raise KeyError when unavailable. 571 572 Returns: 573 ManagedRumble with set(value, timeout) interface. 574 """ 575 qn = self._resolve_name(name, group) 576 577 if qn in self._rumbles: 578 self._mark_in_use(self._rumbles[qn]) 579 return self._rumbles[qn] 580 581 action = self._config.actions.get(qn) 582 binding = self._find_binding(qn) 583 584 if action is None: 585 if required: 586 raise KeyError( 587 f"Action '{qn}' not found in config " 588 f"({self._config_source})") 589 log.warning("Action '%s' not found, returning no-op rumble", qn) 590 rumble = ManagedRumble(None, lambda v: None) 591 self._rumbles[qn] = rumble 592 return rumble 593 594 if binding is None: 595 if required: 596 raise KeyError( 597 f"Action '{qn}' exists but is not bound to any " 598 f"input ({self._config_source})") 599 log.warning("Action '%s' not bound, returning no-op rumble", qn) 600 rumble = ManagedRumble(action, lambda v: None) 601 self._rumbles[qn] = rumble 602 return rumble 603 604 state, input_name = binding 605 setter = make_output_setter(state, input_name) 606 607 # Create NT-enabled subclass 608 nt_path = f"{_NT_BASE}/actions/{action.group}/{action.name}" 609 klass = make_rumble_nt_class(nt_path, action) 610 rumble = klass(action, setter) 611 rumble._binding_info = format_binding_info(state, input_name) 612 self._rumbles[qn] = rumble 613 self._mark_in_use(rumble) 614 return rumble
Get a managed rumble output for the named action.
Args: name: Action name — either qualified "group.name" or short "name". If no dot is present and group is None, the "general" group is assumed. group: Explicit group override (used when name has no dot). required: If True, raise KeyError when unavailable.
Returns: ManagedRumble with set(value, timeout) interface.
618 def getController(self, port: int) -> wpilib.XboxController | None: 619 """Get the raw XboxController for a given port (for telemetry/logging).""" 620 state = self._controllers.get(port) 621 return state.controller if state is not None else None
Get the raw XboxController for a given port (for telemetry/logging).
673 def remap(self, action_name: str, port: int, 674 input_name: str) -> None: 675 """Swap the physical input for a named action. 676 677 Not implemented — reserved for future dynamic remapping. 678 """ 679 raise NotImplementedError("Dynamic remapping not yet implemented")
Swap the physical input for a named action.
Not implemented — reserved for future dynamic remapping.
66def get_factory() -> "InputFactory": 67 """Return the active InputFactory instance. 68 69 Allows any module (subsystems, commands, etc.) to fetch managed 70 inputs without having the factory passed through constructors:: 71 72 from utils.input import get_factory 73 rumble = get_factory().getRumbleControl("feedback.rumble") 74 75 Raises RuntimeError if no factory has been created yet. 76 """ 77 if _active_factory is None: 78 raise RuntimeError( 79 "InputFactory not initialized — create one in robotInit " 80 "before calling get_factory()") 81 return _active_factory
Return the active InputFactory instance.
Allows any module (subsystems, commands, etc.) to fetch managed inputs without having the factory passed through constructors::
from utils.input import get_factory
rumble = get_factory().getRumbleControl("feedback.rumble")
Raises RuntimeError if no factory has been created yet.
54class ManagedAnalog(NtMappingMixin): 55 """A managed analog input with full shaping pipeline. 56 57 Args: 58 action: The ActionDefinition this analog represents (may be None). 59 accessor: Callable returning the raw axis value each cycle. 60 default_value: Value returned when no accessor is bound. 61 """ 62 63 _PARAM_TYPES: dict[str, type] = { 64 "deadband": float, 65 "inversion": bool, 66 "scale": float, 67 "slew_rate": float, 68 } 69 70 def __init__( 71 self, 72 action: ActionDefinition | None, 73 accessor: Callable[[], float], 74 default_value: float = 0.0, 75 ): 76 self._action = action 77 self._accessor = accessor 78 self._default_value = default_value 79 80 # Mutable properties — initialized from action or defaults 81 if action is not None: 82 self._deadband = action.deadband 83 self._inversion = action.inversion 84 self._scale = action.scale 85 self._slew_rate = action.slew_rate 86 self._trigger_mode = action.trigger_mode 87 self._extra = action.extra or {} 88 else: 89 self._deadband = 0.0 90 self._inversion = False 91 self._scale = 1.0 92 self._slew_rate = 0.0 93 self._trigger_mode = EventTriggerMode.RAW 94 self._extra = {} 95 96 # Binding info — set by the factory after construction. 97 # Format: "ControllerName.input_name (channel)" 98 self._binding_info: str = "unbound" 99 100 self._init_nt_mapping() 101 102 self._pipeline: Callable[[float], float] = lambda x: x 103 self._slew_limiter = None 104 self._rebuild_pipeline() 105 self._rebuild_slew_limiter() 106 107 # --- Properties with pipeline rebuild --- 108 109 @property 110 def deadband(self) -> float: 111 return self._deadband 112 113 @deadband.setter 114 def deadband(self, value: float) -> None: 115 if self._deadband != value: 116 self._deadband = value 117 self._rebuild_pipeline() 118 119 @property 120 def inversion(self) -> bool: 121 return self._inversion 122 123 @inversion.setter 124 def inversion(self, value: bool) -> None: 125 if self._inversion != value: 126 self._inversion = value 127 self._rebuild_pipeline() 128 129 @property 130 def scale(self) -> float: 131 return self._scale 132 133 @scale.setter 134 def scale(self, value: float) -> None: 135 if self._scale != value: 136 self._scale = value 137 self._rebuild_pipeline() 138 139 @property 140 def slew_rate(self) -> float: 141 """Max output change rate in units/sec. 0 = disabled. 142 143 Applies symmetrically (same limit up and down) unless 144 ``extra["negative_slew_rate"]`` is set on the action. 145 """ 146 return self._slew_rate 147 148 @slew_rate.setter 149 def slew_rate(self, value: float) -> None: 150 if self._slew_rate != value: 151 self._slew_rate = value 152 self._rebuild_slew_limiter() 153 154 @property 155 def action(self) -> ActionDefinition | None: 156 return self._action 157 158 # --- Value access --- 159 160 def get(self) -> float: 161 """Return the shaped value through the full pipeline + slew limit.""" 162 shaped = self._pipeline(self._accessor()) 163 if self._slew_limiter is not None: 164 shaped = self._slew_limiter.calculate(shaped) 165 self._last_slew_output = shaped 166 return shaped 167 168 def getRaw(self) -> float: 169 """Return the unmodified raw HID value.""" 170 return self._accessor() 171 172 def __call__(self) -> float: 173 """Alias for get() — makes ManagedAnalog a Callable[[], float].""" 174 return self.get() 175 176 # --- Internal --- 177 178 def __str__(self) -> str: 179 name = self._action.qualified_name if self._action else "unbound" 180 mode = self._trigger_mode.value 181 parts = [f"ManagedAnalog('{name}'"] 182 parts.append(f"bind={self._binding_info}") 183 parts.append(f"mode={mode}") 184 parts.append(f"pipeline={self._pipeline}") 185 if self._slew_rate > 0: 186 neg = self._extra.get("negative_slew_rate", -self._slew_rate) 187 parts.append(f"slew_rate={self._slew_rate}/{neg}") 188 parts_str = ", ".join(parts) 189 return f"{parts_str})" 190 191 def _rebuild_pipeline(self) -> None: 192 """Rebuild the shaping closure from current properties.""" 193 self._pipeline = build_shaping_pipeline( 194 inversion=self._inversion, 195 deadband=self._deadband, 196 trigger_mode=self._trigger_mode, 197 scale=self._scale, 198 extra=self._extra, 199 action_name=( 200 self._action.qualified_name if self._action else ""), 201 ) 202 203 def _rebuild_slew_limiter(self) -> None: 204 """Recreate the SlewRateLimiter when rate changes. 205 206 SlewRateLimiter rate limits are constructor params, so a new 207 instance is needed when the rate changes. Setting slew_rate 208 to 0 disables the limiter. 209 210 The negative (decreasing) rate defaults to ``-slew_rate`` but 211 can be overridden via ``extra["negative_slew_rate"]`` in the 212 action's YAML config for asymmetric limiting. 213 214 Preserves the last output value so changing the rate at runtime 215 doesn't cause a transient glitch (the new limiter starts from 216 where the old one left off rather than resetting to 0). 217 """ 218 if self._slew_rate <= 0: 219 self._slew_limiter = None 220 return 221 try: 222 from wpimath.filter import SlewRateLimiter 223 neg_rate = self._extra.get( 224 "negative_slew_rate", -self._slew_rate) 225 initial = getattr(self, '_last_slew_output', 0.0) 226 self._slew_limiter = SlewRateLimiter( 227 self._slew_rate, neg_rate, initial) 228 except ImportError: 229 # Graceful degradation in test environments 230 self._slew_limiter = None 231 232 def _rebind(self, accessor: Callable[[], float]) -> None: 233 """Swap the raw accessor for dynamic remapping.""" 234 self._accessor = accessor
A managed analog input with full shaping pipeline.
Args: action: The ActionDefinition this analog represents (may be None). accessor: Callable returning the raw axis value each cycle. default_value: Value returned when no accessor is bound.
70 def __init__( 71 self, 72 action: ActionDefinition | None, 73 accessor: Callable[[], float], 74 default_value: float = 0.0, 75 ): 76 self._action = action 77 self._accessor = accessor 78 self._default_value = default_value 79 80 # Mutable properties — initialized from action or defaults 81 if action is not None: 82 self._deadband = action.deadband 83 self._inversion = action.inversion 84 self._scale = action.scale 85 self._slew_rate = action.slew_rate 86 self._trigger_mode = action.trigger_mode 87 self._extra = action.extra or {} 88 else: 89 self._deadband = 0.0 90 self._inversion = False 91 self._scale = 1.0 92 self._slew_rate = 0.0 93 self._trigger_mode = EventTriggerMode.RAW 94 self._extra = {} 95 96 # Binding info — set by the factory after construction. 97 # Format: "ControllerName.input_name (channel)" 98 self._binding_info: str = "unbound" 99 100 self._init_nt_mapping() 101 102 self._pipeline: Callable[[float], float] = lambda x: x 103 self._slew_limiter = None 104 self._rebuild_pipeline() 105 self._rebuild_slew_limiter()
139 @property 140 def slew_rate(self) -> float: 141 """Max output change rate in units/sec. 0 = disabled. 142 143 Applies symmetrically (same limit up and down) unless 144 ``extra["negative_slew_rate"]`` is set on the action. 145 """ 146 return self._slew_rate
Max output change rate in units/sec. 0 = disabled.
Applies symmetrically (same limit up and down) unless
extra["negative_slew_rate"] is set on the action.
160 def get(self) -> float: 161 """Return the shaped value through the full pipeline + slew limit.""" 162 shaped = self._pipeline(self._accessor()) 163 if self._slew_limiter is not None: 164 shaped = self._slew_limiter.calculate(shaped) 165 self._last_slew_output = shaped 166 return shaped
Return the shaped value through the full pipeline + slew limit.
25class ManagedButton(NtMappingMixin): 26 """A managed boolean input backed by a Trigger. 27 28 Args: 29 action: The ActionDefinition this button represents (may be None 30 for unbound/default buttons). 31 condition: Callable returning the current boolean state. 32 default_value: Value returned when no condition is bound. 33 """ 34 35 _PARAM_TYPES: dict[str, type] = { 36 "threshold": float, 37 } 38 39 def __init__( 40 self, 41 action: ActionDefinition | None, 42 condition: Callable[[], bool], 43 default_value: bool = False, 44 ): 45 self._action = action 46 self._condition = condition 47 self._default_value = default_value 48 self._trigger = Trigger(condition) 49 50 # Mutable threshold ref for BOOLEAN_TRIGGER live-tuning. 51 # The factory sets this to a shared list that the condition 52 # closure also reads, so threshold changes take effect 53 # without rebuilding the Trigger or losing command bindings. 54 self._threshold_ref: list[float] | None = None 55 56 # Binding info — set by the factory after construction. 57 # Format: "ControllerName.input_name (channel)" 58 self._binding_info: str = "unbound" 59 60 self._init_nt_mapping() 61 62 # --- Threshold property (BOOLEAN_TRIGGER live-tuning) --- 63 64 @property 65 def threshold(self) -> float: 66 """Current threshold value for BOOLEAN_TRIGGER conditions.""" 67 if self._threshold_ref is not None: 68 return self._threshold_ref[0] 69 return self._action.threshold if self._action is not None else 0.5 70 71 @threshold.setter 72 def threshold(self, value: float) -> None: 73 if self._threshold_ref is not None: 74 self._threshold_ref[0] = float(value) 75 76 # --- Mixin overrides for threshold-specific behavior --- 77 78 def _validate_param(self, param: str) -> str | None: 79 """Reject threshold mapping on non-BOOLEAN_TRIGGER actions.""" 80 if (param == "threshold" 81 and self._action is not None 82 and self._action.input_type != InputType.BOOLEAN_TRIGGER): 83 return ( 84 f"Cannot map 'threshold' on a non-BOOLEAN_TRIGGER action " 85 f"('{self._action.qualified_name}' is " 86 f"{self._action.input_type.value})") 87 return None 88 89 def _get_param_value(self, param: str): 90 """Read threshold from the property (which reads _threshold_ref).""" 91 if param == "threshold": 92 return self.threshold 93 return getattr(self, param) 94 95 def _set_param_value(self, param: str, value) -> None: 96 """Write threshold via the property and sync NT if available.""" 97 if param == "threshold": 98 self.threshold = value 99 if hasattr(self, 'nt_threshold'): 100 self.nt_threshold = value 101 else: 102 setattr(self, param, value) 103 104 # --- Drop-in Trigger binding methods --- 105 106 def onTrue(self, command: commands2.Command) -> "ManagedButton": 107 """Schedule command when condition becomes True.""" 108 self._trigger.onTrue(command) 109 return self 110 111 def onFalse(self, command: commands2.Command) -> "ManagedButton": 112 """Schedule command when condition becomes False.""" 113 self._trigger.onFalse(command) 114 return self 115 116 def whileTrue(self, command: commands2.Command) -> "ManagedButton": 117 """Schedule command while condition is True, cancel on False.""" 118 self._trigger.whileTrue(command) 119 return self 120 121 def whileFalse(self, command: commands2.Command) -> "ManagedButton": 122 """Schedule command while condition is False, cancel on True.""" 123 self._trigger.whileFalse(command) 124 return self 125 126 def toggleOnTrue(self, command: commands2.Command) -> "ManagedButton": 127 """Toggle command scheduling each time condition becomes True.""" 128 self._trigger.toggleOnTrue(command) 129 return self 130 131 _BINDING_MAP = { 132 EventTriggerMode.ON_TRUE: "onTrue", 133 EventTriggerMode.ON_FALSE: "onFalse", 134 EventTriggerMode.WHILE_TRUE: "whileTrue", 135 EventTriggerMode.WHILE_FALSE: "whileFalse", 136 EventTriggerMode.TOGGLE_ON_TRUE: "toggleOnTrue", 137 } 138 139 def bind( 140 self, 141 command: commands2.Command, 142 mode: EventTriggerMode | None = None, 143 ) -> "ManagedButton": 144 """Bind a command using the configured or overridden trigger mode. 145 146 Args: 147 command: The command to bind. 148 mode: Override the YAML-configured trigger mode. When None 149 (default), uses the action's configured mode. Pass an 150 explicit EventTriggerMode to ignore the config, e.g. 151 ``btn.bind(cmd, mode=EventTriggerMode.WHILE_TRUE)``. 152 153 Falls back to onTrue when no action is set and no override given. 154 """ 155 if mode is None: 156 mode = (self._action.trigger_mode 157 if self._action is not None 158 else EventTriggerMode.ON_TRUE) 159 method_name = self._BINDING_MAP.get(mode, "onTrue") 160 return getattr(self, method_name)(command) 161 162 # --- State access --- 163 164 def get(self) -> bool: 165 """Return the current boolean state.""" 166 return self._condition() 167 168 @property 169 def trigger(self) -> Trigger: 170 """Access the underlying Trigger for advanced composition.""" 171 return self._trigger 172 173 @property 174 def action(self) -> ActionDefinition | None: 175 """The action definition, if any.""" 176 return self._action 177 178 def __str__(self) -> str: 179 name = self._action.qualified_name if self._action else "unbound" 180 parts = [f"ManagedButton('{name}'"] 181 parts.append(f"bind={self._binding_info}") 182 if self._action is not None: 183 parts.append(f"type={self._action.input_type.value}") 184 parts.append(f"mode={self._action.trigger_mode.value}") 185 if self._action.input_type == InputType.BOOLEAN_TRIGGER: 186 parts.append(f"threshold={self.threshold}") 187 parts_str = ", ".join(parts) 188 return f"{parts_str})" 189 190 # --- Remapping support --- 191 192 def _rebind(self, condition: Callable[[], bool]) -> None: 193 """Swap the condition and rebuild the internal Trigger. 194 195 Used by factory.remap() for dynamic remapping. 196 Note: existing command bindings on the old Trigger will no 197 longer fire — callers must re-register bindings after remap. 198 """ 199 self._condition = condition 200 self._trigger = Trigger(condition)
A managed boolean input backed by a Trigger.
Args: action: The ActionDefinition this button represents (may be None for unbound/default buttons). condition: Callable returning the current boolean state. default_value: Value returned when no condition is bound.
39 def __init__( 40 self, 41 action: ActionDefinition | None, 42 condition: Callable[[], bool], 43 default_value: bool = False, 44 ): 45 self._action = action 46 self._condition = condition 47 self._default_value = default_value 48 self._trigger = Trigger(condition) 49 50 # Mutable threshold ref for BOOLEAN_TRIGGER live-tuning. 51 # The factory sets this to a shared list that the condition 52 # closure also reads, so threshold changes take effect 53 # without rebuilding the Trigger or losing command bindings. 54 self._threshold_ref: list[float] | None = None 55 56 # Binding info — set by the factory after construction. 57 # Format: "ControllerName.input_name (channel)" 58 self._binding_info: str = "unbound" 59 60 self._init_nt_mapping()
64 @property 65 def threshold(self) -> float: 66 """Current threshold value for BOOLEAN_TRIGGER conditions.""" 67 if self._threshold_ref is not None: 68 return self._threshold_ref[0] 69 return self._action.threshold if self._action is not None else 0.5
Current threshold value for BOOLEAN_TRIGGER conditions.
106 def onTrue(self, command: commands2.Command) -> "ManagedButton": 107 """Schedule command when condition becomes True.""" 108 self._trigger.onTrue(command) 109 return self
Schedule command when condition becomes True.
111 def onFalse(self, command: commands2.Command) -> "ManagedButton": 112 """Schedule command when condition becomes False.""" 113 self._trigger.onFalse(command) 114 return self
Schedule command when condition becomes False.
116 def whileTrue(self, command: commands2.Command) -> "ManagedButton": 117 """Schedule command while condition is True, cancel on False.""" 118 self._trigger.whileTrue(command) 119 return self
Schedule command while condition is True, cancel on False.
121 def whileFalse(self, command: commands2.Command) -> "ManagedButton": 122 """Schedule command while condition is False, cancel on True.""" 123 self._trigger.whileFalse(command) 124 return self
Schedule command while condition is False, cancel on True.
126 def toggleOnTrue(self, command: commands2.Command) -> "ManagedButton": 127 """Toggle command scheduling each time condition becomes True.""" 128 self._trigger.toggleOnTrue(command) 129 return self
Toggle command scheduling each time condition becomes True.
139 def bind( 140 self, 141 command: commands2.Command, 142 mode: EventTriggerMode | None = None, 143 ) -> "ManagedButton": 144 """Bind a command using the configured or overridden trigger mode. 145 146 Args: 147 command: The command to bind. 148 mode: Override the YAML-configured trigger mode. When None 149 (default), uses the action's configured mode. Pass an 150 explicit EventTriggerMode to ignore the config, e.g. 151 ``btn.bind(cmd, mode=EventTriggerMode.WHILE_TRUE)``. 152 153 Falls back to onTrue when no action is set and no override given. 154 """ 155 if mode is None: 156 mode = (self._action.trigger_mode 157 if self._action is not None 158 else EventTriggerMode.ON_TRUE) 159 method_name = self._BINDING_MAP.get(mode, "onTrue") 160 return getattr(self, method_name)(command)
Bind a command using the configured or overridden trigger mode.
Args:
command: The command to bind.
mode: Override the YAML-configured trigger mode. When None
(default), uses the action's configured mode. Pass an
explicit EventTriggerMode to ignore the config, e.g.
btn.bind(cmd, mode=EventTriggerMode.WHILE_TRUE).
Falls back to onTrue when no action is set and no override given.
168 @property 169 def trigger(self) -> Trigger: 170 """Access the underlying Trigger for advanced composition.""" 171 return self._trigger
Access the underlying Trigger for advanced composition.
173 @property 174 def action(self) -> ActionDefinition | None: 175 """The action definition, if any.""" 176 return self._action
The action definition, if any.
14class ManagedRumble: 15 """A managed rumble output channel. 16 17 Args: 18 action: The ActionDefinition this output represents (may be None). 19 setter: Callable that sets the rumble intensity (0.0 to 1.0). 20 """ 21 22 def __init__( 23 self, 24 action: ActionDefinition | None, 25 setter: Callable[[float], None], 26 ): 27 self._action = action 28 self._setter = setter 29 self._stop_time: float | None = None 30 self._current_value: float = 0.0 31 32 # Binding info — set by the factory after construction. 33 # Format: "ControllerName.input_name (channel)" 34 self._binding_info: str = "unbound" 35 36 @property 37 def action(self) -> ActionDefinition | None: 38 return self._action 39 40 def set(self, value: float, timeout: float = 0.0) -> None: 41 """Set rumble intensity. 42 43 Args: 44 value: Intensity from 0.0 to 1.0. 45 timeout: If > 0, auto-stop after this many seconds. 46 """ 47 self._current_value = value 48 self._setter(value) 49 if timeout > 0: 50 self._stop_time = time.monotonic() + timeout 51 else: 52 self._stop_time = None 53 54 def stop(self) -> None: 55 """Stop the rumble immediately.""" 56 self._current_value = 0.0 57 self._setter(0.0) 58 self._stop_time = None 59 60 def update(self) -> None: 61 """Check timeouts — call once per robot cycle.""" 62 if (self._stop_time is not None 63 and time.monotonic() >= self._stop_time): 64 self.stop() 65 66 def __str__(self) -> str: 67 name = self._action.qualified_name if self._action else "unbound" 68 return f"ManagedRumble('{name}', bind={self._binding_info})" 69 70 def _rebind(self, setter: Callable[[float], None]) -> None: 71 """Swap the output setter for dynamic remapping.""" 72 self._setter = setter
A managed rumble output channel.
Args: action: The ActionDefinition this output represents (may be None). setter: Callable that sets the rumble intensity (0.0 to 1.0).
22 def __init__( 23 self, 24 action: ActionDefinition | None, 25 setter: Callable[[float], None], 26 ): 27 self._action = action 28 self._setter = setter 29 self._stop_time: float | None = None 30 self._current_value: float = 0.0 31 32 # Binding info — set by the factory after construction. 33 # Format: "ControllerName.input_name (channel)" 34 self._binding_info: str = "unbound"
40 def set(self, value: float, timeout: float = 0.0) -> None: 41 """Set rumble intensity. 42 43 Args: 44 value: Intensity from 0.0 to 1.0. 45 timeout: If > 0, auto-stop after this many seconds. 46 """ 47 self._current_value = value 48 self._setter(value) 49 if timeout > 0: 50 self._stop_time = time.monotonic() + timeout 51 else: 52 self._stop_time = None
Set rumble intensity.
Args: value: Intensity from 0.0 to 1.0. timeout: If > 0, auto-stop after this many seconds.
21@dataclass 22class ValidationIssue: 23 """A single validation finding.""" 24 level: str # "error" or "warning" 25 message: str 26 context: str = "" 27 28 def __str__(self) -> str: 29 prefix = f"[{self.level.upper()}]" 30 if self.context: 31 return f"{prefix} {self.context}: {self.message}" 32 return f"{prefix} {self.message}"
A single validation finding.
142def validate_config(config: FullConfig) -> list[ValidationIssue]: 143 """Validate a full controller configuration. 144 145 Returns a list of issues (errors and warnings). 146 An empty list means the config is valid. 147 """ 148 issues: list[ValidationIssue] = [] 149 150 # Validate each action definition 151 for action in config.actions.values(): 152 issues.extend(_validate_action(action)) 153 154 # Validate controller bindings 155 for port, ctrl in config.controllers.items(): 156 ctrl_ctx = f"controller {port} ({ctrl.name or 'unnamed'})" 157 158 for input_name, action_names in ctrl.bindings.items(): 159 # Check input name is recognized 160 if input_name not in ALL_INPUT_NAMES: 161 issues.append(ValidationIssue( 162 "error", 163 f"Unknown input name '{input_name}'", 164 ctrl_ctx)) 165 166 category = get_input_category(input_name) 167 168 for action_name in action_names: 169 # Check action exists 170 if action_name not in config.actions: 171 issues.append(ValidationIssue( 172 "error", 173 f"Binding references unknown action " 174 f"'{action_name}'", 175 f"{ctrl_ctx} / {input_name}")) 176 continue 177 178 action = config.actions[action_name] 179 180 # Type consistency: action input_type vs physical input 181 if (action.input_type == InputType.ANALOG 182 and category == "button"): 183 issues.append(ValidationIssue( 184 "warning", 185 f"Analog action '{action_name}' bound to " 186 f"button input '{input_name}'", 187 ctrl_ctx)) 188 189 if (action.input_type == InputType.BUTTON 190 and category == "axis"): 191 issues.append(ValidationIssue( 192 "warning", 193 f"Button action '{action_name}' bound to " 194 f"axis input '{input_name}'", 195 ctrl_ctx)) 196 197 if (action.input_type == InputType.BOOLEAN_TRIGGER 198 and category != "axis"): 199 issues.append(ValidationIssue( 200 "error", 201 f"BOOLEAN_TRIGGER action '{action_name}' must " 202 f"be bound to an axis input, got '{input_name}'", 203 ctrl_ctx)) 204 205 if (action.input_type == InputType.VIRTUAL_ANALOG 206 and category not in ("button", "pov")): 207 issues.append(ValidationIssue( 208 "error", 209 f"VIRTUAL_ANALOG action '{action_name}' must " 210 f"be bound to a button/POV input, " 211 f"got '{input_name}'", 212 ctrl_ctx)) 213 214 if (action.input_type == InputType.OUTPUT 215 and category != "output"): 216 issues.append(ValidationIssue( 217 "warning", 218 f"Output action '{action_name}' bound to " 219 f"non-output input '{input_name}'", 220 ctrl_ctx)) 221 222 # Warn about empty action definitions 223 for action in config.actions.values(): 224 if (not action.description 225 and action.input_type == InputType.BUTTON 226 and action.trigger_mode == EventTriggerMode.ON_TRUE 227 and action.deadband == 0.0 228 and action.scale == 1.0 229 and not action.extra): 230 issues.append(ValidationIssue( 231 "warning", 232 "Action has all default values (possibly empty definition)", 233 action.qualified_name)) 234 235 return issues
Validate a full controller configuration.
Returns a list of issues (errors and warnings). An empty list means the config is valid.