utils.input

Controller input factory — config-driven managed input objects.

Public API::

from utils.input import InputFactory, get_factory

# In robotInit — create the factory (registers as active):
factory = InputFactory(config_path="data/inputs/controller.yaml")

# Anywhere else — fetch the active factory:
factory = get_factory()
button = factory.getButton("intake.run")
analog = factory.getAnalog("drivetrain.speed")
rumble = factory.getRumbleControl("general.rumble_left")
 1"""Controller input factory — config-driven managed input objects.
 2
 3Public API::
 4
 5    from utils.input import InputFactory, get_factory
 6
 7    # In robotInit — create the factory (registers as active):
 8    factory = InputFactory(config_path="data/inputs/controller.yaml")
 9
10    # Anywhere else — fetch the active factory:
11    factory = get_factory()
12    button = factory.getButton("intake.run")
13    analog = factory.getAnalog("drivetrain.speed")
14    rumble = factory.getRumbleControl("general.rumble_left")
15"""
16
17from utils.input.factory import InputFactory, get_factory
18from utils.input.managed_analog import ManagedAnalog
19from utils.input.managed_button import ManagedButton
20from utils.input.managed_rumble import ManagedRumble
21from utils.input.validation import ValidationIssue, validate_config
22
23__all__ = [
24    "InputFactory",
25    "get_factory",
26    "ManagedAnalog",
27    "ManagedButton",
28    "ManagedRumble",
29    "ValidationIssue",
30    "validate_config",
31]
class InputFactory:
115class InputFactory:
116    """Config-driven controller input factory.
117
118    Loads configuration and creates managed input objects that wrap
119    wpilib HID devices with config-driven shaping, validation, and
120    NetworkTables publishing.
121
122    Args:
123        config: Pre-built FullConfig object (for testing).
124        config_path: Path to a single YAML with actions + controllers.
125        actions_path: Path to actions-only YAML (used with assignments_path).
126        assignments_path: Path to controllers-only YAML.
127        register_global: Controls singleton registration for get_factory().
128            None (default) — register only if no factory exists yet
129            (first-created wins).  True — always register, replacing any
130            existing factory.  False — never register (standalone instance,
131            invisible to get_factory()).
132
133    Precedence: config > config_path > (actions_path + assignments_path)
134    """
135
136    def __init__(
137        self,
138        config: FullConfig | None = None,
139        config_path: str | Path | None = None,
140        actions_path: str | Path | None = None,
141        assignments_path: str | Path | None = None,
142        register_global: bool | None = None,
143    ):
144        # Load configuration and track source for error messages
145        # Resolve relative paths against the project root (two levels up from
146        # utils/input/) so callers don't depend on cwd.
147        _project_root = Path(__file__).resolve().parent.parent.parent
148        if config is not None:
149            self._config = config
150            self._config_source = "<FullConfig object>"
151            self._config_files: list[Path] = []
152        elif config_path is not None:
153            p = Path(config_path)
154            if not p.is_absolute():
155                p = _project_root / p
156            self._config = load_config(p)
157            self._config_source = str(config_path)
158            self._config_files = [p.resolve()]
159        elif actions_path is not None and assignments_path is not None:
160            actions = load_actions_from_file(actions_path)
161            controllers = load_assignments_from_file(assignments_path)
162            self._config = FullConfig(
163                actions=actions, controllers=controllers)
164            self._config_source = (
165                f"{actions_path} + {assignments_path}")
166            self._config_files = [
167                Path(actions_path).resolve(),
168                Path(assignments_path).resolve()]
169        else:
170            raise ValueError(
171                "Must provide config, config_path, or "
172                "(actions_path + assignments_path)")
173
174        # Create controller instances
175        self._controllers: dict[int, ControllerState] = {}
176        for port, ctrl_config in self._config.controllers.items():
177            self._controllers[port] = ControllerState(port, ctrl_config)
178
179        # Publish raw controllers to NT for dashboard inspection.
180        # Only publish if connected — avoids warning spam in sim
181        # when no joystick is plugged in (Sendable queries axes).
182        for port, state in self._controllers.items():
183            if wpilib.DriverStation.isJoystickConnected(port):
184                wpilib.SmartDashboard.putData(
185                    f"{_NT_BASE}/controllers/raw/{port}",
186                    state.controller)
187
188        # Publish bindings info and config metadata to NT
189        publish_bindings_nt(_NT_BASE, self._controllers)
190        publish_config_metadata(_NT_BASE, self._config_files)
191
192        # Caches — all factory methods return the same object for a name
193        self._buttons: dict[str, ManagedButton] = {}
194        self._analogs: dict[str, ManagedAnalog] = {}
195        self._rumbles: dict[str, ManagedRumble] = {}
196        self._raw_buttons: dict[str, Callable[[], bool]] = {}
197        self._raw_analogs: dict[tuple, Callable[[], float]] = {}
198        self._va_generators: list[VirtualAnalogGenerator] = []
199
200        # Register as the active factory for get_factory().
201        # register_global=None (default): register only if no factory exists yet.
202        # register_global=True: always register (override existing).
203        # register_global=False: never register (standalone factory).
204        global _active_factory
205        if register_global is True:
206            _active_factory = self
207        elif register_global is None and _active_factory is None:
208            _active_factory = self
209
210        # Register an internal subsystem that syncs NT values each cycle.
211        # The CommandScheduler iterates subsystems in registration order
212        # (dict insertion order in Python 3.7+).  Since the factory is
213        # created in robotInit before user subsystems, this updater is
214        # registered first and its periodic() runs before all others.
215        scheduler = commands2.CommandScheduler.getInstance()
216        if scheduler._subsystems:
217            log.warning(
218                "InputFactory created after %d other subsystem(s) — "
219                "NT sync may run after those subsystems read stale "
220                "values for one cycle. Create the factory before any "
221                "subsystems to guarantee ordering.",
222                len(scheduler._subsystems))
223        self._updater = _FactoryUpdater(self)
224
225        # Eager action creation — pre-create all managed objects so their
226        # NT entries are published immediately.  This lets the dashboard
227        # inspect and tune every action's parameters before robot code
228        # requests them.  Uses required=False so unbound actions get
229        # graceful defaults instead of raising.
230        self._eager_init_active = True
231        for qn, action in self._config.actions.items():
232            try:
233                if action.input_type in (
234                    InputType.BUTTON, InputType.BOOLEAN_TRIGGER
235                ):
236                    self.getButton(qn, required=False)
237                elif action.input_type in (
238                    InputType.ANALOG, InputType.VIRTUAL_ANALOG
239                ):
240                    self.getAnalog(qn, required=False)
241                elif action.input_type == InputType.OUTPUT:
242                    self.getRumbleControl(qn, required=False)
243            except Exception:
244                log.warning(
245                    "Eager creation failed for '%s', will retry on "
246                    "first get*() call", qn, exc_info=True)
247        self._eager_init_active = False
248
249    @property
250    def config(self) -> FullConfig:
251        """The loaded configuration."""
252        return self._config
253
254    # --- Name resolution ---
255
256    def _resolve_name(self, name: str, group: str | None) -> str:
257        """Resolve a name to a fully qualified action name.
258
259        - "group.name" -> "group.name" (dot present, group param ignored)
260        - ("name", group="intake") -> "intake.name"
261        - ("name", group=None) -> "general.name"
262        """
263        if '.' in name:
264            return name
265        g = group if group is not None else "general"
266        return f"{g}.{name}"
267
268    def _find_binding(self, qualified_name: str
269                      ) -> tuple[ControllerState, str] | None:
270        """Find which controller+input is bound to the given action.
271
272        Returns (controller_state, input_name) or None if not bound.
273        """
274        for state in self._controllers.values():
275            if qualified_name in state.action_to_input:
276                return state, state.action_to_input[qualified_name]
277        return None
278
279    def _mark_in_use(self, obj) -> None:
280        """Mark a managed object as in-use (called outside eager init)."""
281        if not self._eager_init_active and hasattr(obj, 'nt_in_use'):
282            obj.nt_in_use = True
283
284    # --- Factory methods ---
285
286    def getButton(
287        self,
288        name: str,
289        group: str | None = None,
290        required: bool = True,
291        default_value: bool = False,
292    ) -> ManagedButton:
293        """Get a managed button for the named action.
294
295        Handles InputType.BUTTON, POV, and BOOLEAN_TRIGGER.
296
297        Args:
298            name: Action name — either qualified "group.name" or short
299                "name".  If no dot is present and group is None, the
300                "general" group is assumed (e.g. "fire" -> "general.fire").
301            group: Explicit group override (used when name has no dot).
302            required: If True, raise KeyError when not found/not bound.
303            default_value: Value returned if unbound.
304
305        Returns:
306            ManagedButton wrapping a Trigger with the configured binding.
307        """
308        qn = self._resolve_name(name, group)
309
310        # Return cached if exists
311        if qn in self._buttons:
312            self._mark_in_use(self._buttons[qn])
313            return self._buttons[qn]
314
315        action = self._config.actions.get(qn)
316        binding = self._find_binding(qn)
317
318        if action is None:
319            if required:
320                raise KeyError(
321                    f"Action '{qn}' not found in config "
322                    f"({self._config_source})")
323            log.warning("Action '%s' not found, returning default", qn)
324            btn = ManagedButton(None, lambda: default_value, default_value)
325            self._buttons[qn] = btn
326            return btn
327
328        if binding is None:
329            if required:
330                raise KeyError(
331                    f"Action '{qn}' exists but is not bound to any "
332                    f"input ({self._config_source})")
333            log.warning("Action '%s' not bound, returning default", qn)
334            btn = ManagedButton(
335                action, lambda: default_value, default_value)
336            self._buttons[qn] = btn
337            return btn
338
339        state, input_name = binding
340
341        # For BOOLEAN_TRIGGER, create a shared mutable threshold ref
342        # so the condition closure reads the latest value each cycle
343        # without needing to rebuild the Trigger.
344        threshold_ref = None
345        if action.input_type == InputType.BOOLEAN_TRIGGER:
346            threshold_ref = [action.threshold]
347        condition = make_button_condition(
348            state, input_name, action, threshold_ref=threshold_ref)
349
350        # Create NT-enabled subclass
351        nt_path = f"{_NT_BASE}/actions/{action.group}/{action.name}"
352        klass = make_button_nt_class(nt_path, action)
353        btn = klass(action, condition, default_value)
354        btn._binding_info = format_binding_info(state, input_name)
355        if threshold_ref is not None:
356            btn._threshold_ref = threshold_ref
357        self._buttons[qn] = btn
358        self._mark_in_use(btn)
359        return btn
360
361    def getRawButton(
362        self,
363        name: str,
364        group: str | None = None,
365        required: bool = True,
366    ) -> Callable[[], bool]:
367        """Get a plain callable returning the button state.
368
369        No Trigger wrapping, no command binding — just a function.
370        For BOOLEAN_TRIGGER: returns lambda applying threshold comparison.
371        Results are cached — same name returns the same callable.
372
373        Args:
374            name: Action name — either qualified "group.name" or short
375                "name".  If no dot is present and group is None, the
376                "general" group is assumed (e.g. "fire" -> "general.fire").
377            group: Explicit group override (used when name has no dot).
378            required: If True, raise KeyError when not found/not bound.
379        """
380        qn = self._resolve_name(name, group)
381
382        if qn in self._raw_buttons:
383            return self._raw_buttons[qn]
384
385        action = self._config.actions.get(qn)
386        binding = self._find_binding(qn)
387
388        if action is None or binding is None:
389            if required:
390                raise KeyError(
391                    f"Action '{qn}' not found or not bound "
392                    f"({self._config_source})")
393            log.warning("Action '%s' unavailable, returning False", qn)
394            fn = lambda: False
395            self._raw_buttons[qn] = fn
396            return fn
397
398        state, input_name = binding
399        fn = make_button_condition(state, input_name, action)
400        self._raw_buttons[qn] = fn
401        return fn
402
403    def getAnalog(
404        self,
405        name: str,
406        group: str | None = None,
407        required: bool = True,
408        default_value: float = 0.0,
409    ) -> ManagedAnalog:
410        """Get a managed analog input for the named action.
411
412        Handles InputType.ANALOG.  Full shaping pipeline from config.
413
414        Args:
415            name: Action name — either qualified "group.name" or short
416                "name".  If no dot is present and group is None, the
417                "general" group is assumed (e.g. "speed" -> "general.speed").
418            group: Explicit group override (used when name has no dot).
419            required: If True, raise KeyError when not found/not bound.
420            default_value: Value returned if unbound.
421
422        Returns:
423            ManagedAnalog — callable returning shaped value.
424        """
425        qn = self._resolve_name(name, group)
426
427        if qn in self._analogs:
428            self._mark_in_use(self._analogs[qn])
429            return self._analogs[qn]
430
431        action = self._config.actions.get(qn)
432        binding = self._find_binding(qn)
433
434        if action is None:
435            if required:
436                raise KeyError(
437                    f"Action '{qn}' not found in config "
438                    f"({self._config_source})")
439            log.warning("Action '%s' not found, returning default", qn)
440            analog = ManagedAnalog(
441                None, lambda: default_value, default_value)
442            self._analogs[qn] = analog
443            return analog
444
445        if action.input_type == InputType.VIRTUAL_ANALOG:
446            if binding is None:
447                if not required:
448                    log.warning(
449                        "Action '%s' is VIRTUAL_ANALOG but not bound, "
450                        "returning default", qn)
451                    analog = ManagedAnalog(
452                        action, lambda: default_value, default_value)
453                    self._analogs[qn] = analog
454                    return analog
455                raise KeyError(
456                    f"VIRTUAL_ANALOG action '{qn}' exists but is not "
457                    f"bound to any input ({self._config_source})")
458            state, input_name = binding
459            button_fn = make_button_condition(state, input_name, action)
460            generator = VirtualAnalogGenerator(action, button_fn)
461            self._va_generators.append(generator)
462            nt_path = f"{_NT_BASE}/actions/{action.group}/{action.name}"
463            klass = make_analog_nt_class(nt_path, action)
464            analog = klass(action, generator.get_value, default_value)
465            analog._binding_info = format_binding_info(state, input_name)
466            self._analogs[qn] = analog
467            self._mark_in_use(analog)
468            return analog
469
470        if binding is None:
471            if required:
472                raise KeyError(
473                    f"Action '{qn}' exists but is not bound to any "
474                    f"input ({self._config_source})")
475            log.warning("Action '%s' not bound, returning default", qn)
476            analog = ManagedAnalog(
477                action, lambda: default_value, default_value)
478            self._analogs[qn] = analog
479            return analog
480
481        state, input_name = binding
482        accessor = make_axis_accessor(state, input_name)
483
484        # Create NT-enabled subclass
485        nt_path = f"{_NT_BASE}/actions/{action.group}/{action.name}"
486        klass = make_analog_nt_class(nt_path, action)
487        analog = klass(action, accessor, default_value)
488        analog._binding_info = format_binding_info(state, input_name)
489        self._analogs[qn] = analog
490        self._mark_in_use(analog)
491        return analog
492
493    def getAnalogRaw(
494        self,
495        name: str,
496        group: str | None = None,
497        required: bool = True,
498        apply_invert: bool = False,
499        apply_deadband: bool = False,
500        apply_scale: bool = False,
501    ) -> Callable[[], float]:
502        """Get a callable returning a selectively-shaped axis value.
503
504        Only applies the requested transformations.  Does not create
505        a managed object — returns a plain callable.
506        Results are cached by (name, apply_invert, apply_deadband,
507        apply_scale) — same args return the same callable.
508
509        Args:
510            name: Action name — either qualified "group.name" or short
511                "name".  If no dot is present and group is None, the
512                "general" group is assumed.
513            group: Explicit group override (used when name has no dot).
514            required: If True, raise KeyError when unavailable.
515            apply_invert: Apply inversion from config.
516            apply_deadband: Apply deadband from config.
517            apply_scale: Apply scale from config.
518        """
519        qn = self._resolve_name(name, group)
520        cache_key = (qn, apply_invert, apply_deadband, apply_scale)
521
522        if cache_key in self._raw_analogs:
523            return self._raw_analogs[cache_key]
524
525        action = self._config.actions.get(qn)
526        binding = self._find_binding(qn)
527
528        if action is None or binding is None:
529            if required:
530                raise KeyError(
531                    f"Action '{qn}' not found or not bound "
532                    f"({self._config_source})")
533            log.warning("Action '%s' unavailable, returning 0.0", qn)
534            fn = lambda: 0.0
535            self._raw_analogs[cache_key] = fn
536            return fn
537
538        state, input_name = binding
539        raw_accessor = make_axis_accessor(state, input_name)
540
541        # Build a selective pipeline using SCALED mode
542        # (applies only the requested transformations)
543        pipeline = build_shaping_pipeline(
544            inversion=action.inversion if apply_invert else False,
545            deadband=action.deadband if apply_deadband else 0.0,
546            trigger_mode=EventTriggerMode.SCALED,
547            scale=action.scale if apply_scale else 1.0,
548            extra={},
549            action_name=qn,
550        )
551
552        def _selective():
553            return pipeline(raw_accessor())
554        self._raw_analogs[cache_key] = _selective
555        return _selective
556
557    def getRumbleControl(
558        self,
559        name: str,
560        group: str | None = None,
561        required: bool = True,
562    ) -> ManagedRumble:
563        """Get a managed rumble output for the named action.
564
565        Args:
566            name: Action name — either qualified "group.name" or short
567                "name".  If no dot is present and group is None, the
568                "general" group is assumed.
569            group: Explicit group override (used when name has no dot).
570            required: If True, raise KeyError when unavailable.
571
572        Returns:
573            ManagedRumble with set(value, timeout) interface.
574        """
575        qn = self._resolve_name(name, group)
576
577        if qn in self._rumbles:
578            self._mark_in_use(self._rumbles[qn])
579            return self._rumbles[qn]
580
581        action = self._config.actions.get(qn)
582        binding = self._find_binding(qn)
583
584        if action is None:
585            if required:
586                raise KeyError(
587                    f"Action '{qn}' not found in config "
588                    f"({self._config_source})")
589            log.warning("Action '%s' not found, returning no-op rumble", qn)
590            rumble = ManagedRumble(None, lambda v: None)
591            self._rumbles[qn] = rumble
592            return rumble
593
594        if binding is None:
595            if required:
596                raise KeyError(
597                    f"Action '{qn}' exists but is not bound to any "
598                    f"input ({self._config_source})")
599            log.warning("Action '%s' not bound, returning no-op rumble", qn)
600            rumble = ManagedRumble(action, lambda v: None)
601            self._rumbles[qn] = rumble
602            return rumble
603
604        state, input_name = binding
605        setter = make_output_setter(state, input_name)
606
607        # Create NT-enabled subclass
608        nt_path = f"{_NT_BASE}/actions/{action.group}/{action.name}"
609        klass = make_rumble_nt_class(nt_path, action)
610        rumble = klass(action, setter)
611        rumble._binding_info = format_binding_info(state, input_name)
612        self._rumbles[qn] = rumble
613        self._mark_in_use(rumble)
614        return rumble
615
616    # --- Controller access ---
617
618    def getController(self, port: int) -> wpilib.XboxController | None:
619        """Get the raw XboxController for a given port (for telemetry/logging)."""
620        state = self._controllers.get(port)
621        return state.controller if state is not None else None
622
623    # --- Periodic sync (called automatically by _FactoryUpdater) ---
624
625    def _update(self) -> None:
626        """Sync NT values into managed objects and handle rumble timeouts.
627
628        Called automatically each scheduler cycle by ``_FactoryUpdater``.
629        Because the factory registers before user subsystems, this runs
630        first — so managed inputs have fresh NT values before any
631        subsystem reads them.
632
633        NT values are read once per cycle and compared against cached
634        local values.  If a dashboard change is detected, the local
635        property is updated and the pipeline is rebuilt.  This prevents
636        mid-cycle inconsistency — all reads within a single cycle see
637        the same parameter snapshot.
638
639        Custom NT mappings (from ``mapParamToNtPath()``) are synced
640        after the auto-generated NT sync.  Parameters with custom
641        mappings are skipped during auto-generated sync to prevent
642        conflicts.
643
644        All NT reads (both auto-generated and custom) happen here in
645        the main robot loop — not via ntcore listeners — because
646        listener callbacks fire on a background thread and would race
647        with property reads during pipeline execution.
648        """
649        # Sync NT -> ManagedAnalog properties
650        for analog in self._analogs.values():
651            if analog.action is None:
652                continue
653            sync_analog_nt(analog)
654            analog._sync_custom_maps()
655
656        # Sync NT -> ManagedButton properties (BOOLEAN_TRIGGER threshold)
657        for btn in self._buttons.values():
658            if btn.action is None:
659                continue
660            sync_button_nt(btn)
661            btn._sync_custom_maps()
662
663        # Update rumble timeouts
664        for rumble in self._rumbles.values():
665            rumble.update()
666        
667        # Update virtual analog generators
668        for gen in self._va_generators:
669            gen.update()
670
671    # --- Future: dynamic remapping ---
672
673    def remap(self, action_name: str, port: int,
674              input_name: str) -> None:
675        """Swap the physical input for a named action.
676
677        Not implemented — reserved for future dynamic remapping.
678        """
679        raise NotImplementedError("Dynamic remapping not yet implemented")

Config-driven controller input factory.

Loads configuration and creates managed input objects that wrap wpilib HID devices with config-driven shaping, validation, and NetworkTables publishing.

Args: config: Pre-built FullConfig object (for testing). config_path: Path to a single YAML with actions + controllers. actions_path: Path to actions-only YAML (used with assignments_path). assignments_path: Path to controllers-only YAML. register_global: Controls singleton registration for get_factory(). None (default) — register only if no factory exists yet (first-created wins). True — always register, replacing any existing factory. False — never register (standalone instance, invisible to get_factory()).

Precedence: config > config_path > (actions_path + assignments_path)

InputFactory( config: utils.controller.FullConfig | None = None, config_path: str | pathlib.Path | None = None, actions_path: str | pathlib.Path | None = None, assignments_path: str | pathlib.Path | None = None, register_global: bool | None = None)
136    def __init__(
137        self,
138        config: FullConfig | None = None,
139        config_path: str | Path | None = None,
140        actions_path: str | Path | None = None,
141        assignments_path: str | Path | None = None,
142        register_global: bool | None = None,
143    ):
144        # Load configuration and track source for error messages
145        # Resolve relative paths against the project root (two levels up from
146        # utils/input/) so callers don't depend on cwd.
147        _project_root = Path(__file__).resolve().parent.parent.parent
148        if config is not None:
149            self._config = config
150            self._config_source = "<FullConfig object>"
151            self._config_files: list[Path] = []
152        elif config_path is not None:
153            p = Path(config_path)
154            if not p.is_absolute():
155                p = _project_root / p
156            self._config = load_config(p)
157            self._config_source = str(config_path)
158            self._config_files = [p.resolve()]
159        elif actions_path is not None and assignments_path is not None:
160            actions = load_actions_from_file(actions_path)
161            controllers = load_assignments_from_file(assignments_path)
162            self._config = FullConfig(
163                actions=actions, controllers=controllers)
164            self._config_source = (
165                f"{actions_path} + {assignments_path}")
166            self._config_files = [
167                Path(actions_path).resolve(),
168                Path(assignments_path).resolve()]
169        else:
170            raise ValueError(
171                "Must provide config, config_path, or "
172                "(actions_path + assignments_path)")
173
174        # Create controller instances
175        self._controllers: dict[int, ControllerState] = {}
176        for port, ctrl_config in self._config.controllers.items():
177            self._controllers[port] = ControllerState(port, ctrl_config)
178
179        # Publish raw controllers to NT for dashboard inspection.
180        # Only publish if connected — avoids warning spam in sim
181        # when no joystick is plugged in (Sendable queries axes).
182        for port, state in self._controllers.items():
183            if wpilib.DriverStation.isJoystickConnected(port):
184                wpilib.SmartDashboard.putData(
185                    f"{_NT_BASE}/controllers/raw/{port}",
186                    state.controller)
187
188        # Publish bindings info and config metadata to NT
189        publish_bindings_nt(_NT_BASE, self._controllers)
190        publish_config_metadata(_NT_BASE, self._config_files)
191
192        # Caches — all factory methods return the same object for a name
193        self._buttons: dict[str, ManagedButton] = {}
194        self._analogs: dict[str, ManagedAnalog] = {}
195        self._rumbles: dict[str, ManagedRumble] = {}
196        self._raw_buttons: dict[str, Callable[[], bool]] = {}
197        self._raw_analogs: dict[tuple, Callable[[], float]] = {}
198        self._va_generators: list[VirtualAnalogGenerator] = []
199
200        # Register as the active factory for get_factory().
201        # register_global=None (default): register only if no factory exists yet.
202        # register_global=True: always register (override existing).
203        # register_global=False: never register (standalone factory).
204        global _active_factory
205        if register_global is True:
206            _active_factory = self
207        elif register_global is None and _active_factory is None:
208            _active_factory = self
209
210        # Register an internal subsystem that syncs NT values each cycle.
211        # The CommandScheduler iterates subsystems in registration order
212        # (dict insertion order in Python 3.7+).  Since the factory is
213        # created in robotInit before user subsystems, this updater is
214        # registered first and its periodic() runs before all others.
215        scheduler = commands2.CommandScheduler.getInstance()
216        if scheduler._subsystems:
217            log.warning(
218                "InputFactory created after %d other subsystem(s) — "
219                "NT sync may run after those subsystems read stale "
220                "values for one cycle. Create the factory before any "
221                "subsystems to guarantee ordering.",
222                len(scheduler._subsystems))
223        self._updater = _FactoryUpdater(self)
224
225        # Eager action creation — pre-create all managed objects so their
226        # NT entries are published immediately.  This lets the dashboard
227        # inspect and tune every action's parameters before robot code
228        # requests them.  Uses required=False so unbound actions get
229        # graceful defaults instead of raising.
230        self._eager_init_active = True
231        for qn, action in self._config.actions.items():
232            try:
233                if action.input_type in (
234                    InputType.BUTTON, InputType.BOOLEAN_TRIGGER
235                ):
236                    self.getButton(qn, required=False)
237                elif action.input_type in (
238                    InputType.ANALOG, InputType.VIRTUAL_ANALOG
239                ):
240                    self.getAnalog(qn, required=False)
241                elif action.input_type == InputType.OUTPUT:
242                    self.getRumbleControl(qn, required=False)
243            except Exception:
244                log.warning(
245                    "Eager creation failed for '%s', will retry on "
246                    "first get*() call", qn, exc_info=True)
247        self._eager_init_active = False
config: utils.controller.FullConfig
249    @property
250    def config(self) -> FullConfig:
251        """The loaded configuration."""
252        return self._config

The loaded configuration.

def getButton( self, name: str, group: str | None = None, required: bool = True, default_value: bool = False) -> ManagedButton:
286    def getButton(
287        self,
288        name: str,
289        group: str | None = None,
290        required: bool = True,
291        default_value: bool = False,
292    ) -> ManagedButton:
293        """Get a managed button for the named action.
294
295        Handles InputType.BUTTON, POV, and BOOLEAN_TRIGGER.
296
297        Args:
298            name: Action name — either qualified "group.name" or short
299                "name".  If no dot is present and group is None, the
300                "general" group is assumed (e.g. "fire" -> "general.fire").
301            group: Explicit group override (used when name has no dot).
302            required: If True, raise KeyError when not found/not bound.
303            default_value: Value returned if unbound.
304
305        Returns:
306            ManagedButton wrapping a Trigger with the configured binding.
307        """
308        qn = self._resolve_name(name, group)
309
310        # Return cached if exists
311        if qn in self._buttons:
312            self._mark_in_use(self._buttons[qn])
313            return self._buttons[qn]
314
315        action = self._config.actions.get(qn)
316        binding = self._find_binding(qn)
317
318        if action is None:
319            if required:
320                raise KeyError(
321                    f"Action '{qn}' not found in config "
322                    f"({self._config_source})")
323            log.warning("Action '%s' not found, returning default", qn)
324            btn = ManagedButton(None, lambda: default_value, default_value)
325            self._buttons[qn] = btn
326            return btn
327
328        if binding is None:
329            if required:
330                raise KeyError(
331                    f"Action '{qn}' exists but is not bound to any "
332                    f"input ({self._config_source})")
333            log.warning("Action '%s' not bound, returning default", qn)
334            btn = ManagedButton(
335                action, lambda: default_value, default_value)
336            self._buttons[qn] = btn
337            return btn
338
339        state, input_name = binding
340
341        # For BOOLEAN_TRIGGER, create a shared mutable threshold ref
342        # so the condition closure reads the latest value each cycle
343        # without needing to rebuild the Trigger.
344        threshold_ref = None
345        if action.input_type == InputType.BOOLEAN_TRIGGER:
346            threshold_ref = [action.threshold]
347        condition = make_button_condition(
348            state, input_name, action, threshold_ref=threshold_ref)
349
350        # Create NT-enabled subclass
351        nt_path = f"{_NT_BASE}/actions/{action.group}/{action.name}"
352        klass = make_button_nt_class(nt_path, action)
353        btn = klass(action, condition, default_value)
354        btn._binding_info = format_binding_info(state, input_name)
355        if threshold_ref is not None:
356            btn._threshold_ref = threshold_ref
357        self._buttons[qn] = btn
358        self._mark_in_use(btn)
359        return btn

Get a managed button for the named action.

Handles InputType.BUTTON, POV, and BOOLEAN_TRIGGER.

Args: name: Action name — either qualified "group.name" or short "name". If no dot is present and group is None, the "general" group is assumed (e.g. "fire" -> "general.fire"). group: Explicit group override (used when name has no dot). required: If True, raise KeyError when not found/not bound. default_value: Value returned if unbound.

Returns: ManagedButton wrapping a Trigger with the configured binding.

def getRawButton( self, name: str, group: str | None = None, required: bool = True) -> Callable[[], bool]:
361    def getRawButton(
362        self,
363        name: str,
364        group: str | None = None,
365        required: bool = True,
366    ) -> Callable[[], bool]:
367        """Get a plain callable returning the button state.
368
369        No Trigger wrapping, no command binding — just a function.
370        For BOOLEAN_TRIGGER: returns lambda applying threshold comparison.
371        Results are cached — same name returns the same callable.
372
373        Args:
374            name: Action name — either qualified "group.name" or short
375                "name".  If no dot is present and group is None, the
376                "general" group is assumed (e.g. "fire" -> "general.fire").
377            group: Explicit group override (used when name has no dot).
378            required: If True, raise KeyError when not found/not bound.
379        """
380        qn = self._resolve_name(name, group)
381
382        if qn in self._raw_buttons:
383            return self._raw_buttons[qn]
384
385        action = self._config.actions.get(qn)
386        binding = self._find_binding(qn)
387
388        if action is None or binding is None:
389            if required:
390                raise KeyError(
391                    f"Action '{qn}' not found or not bound "
392                    f"({self._config_source})")
393            log.warning("Action '%s' unavailable, returning False", qn)
394            fn = lambda: False
395            self._raw_buttons[qn] = fn
396            return fn
397
398        state, input_name = binding
399        fn = make_button_condition(state, input_name, action)
400        self._raw_buttons[qn] = fn
401        return fn

Get a plain callable returning the button state.

No Trigger wrapping, no command binding — just a function. For BOOLEAN_TRIGGER: returns lambda applying threshold comparison. Results are cached — same name returns the same callable.

Args: name: Action name — either qualified "group.name" or short "name". If no dot is present and group is None, the "general" group is assumed (e.g. "fire" -> "general.fire"). group: Explicit group override (used when name has no dot). required: If True, raise KeyError when not found/not bound.

def getAnalog( self, name: str, group: str | None = None, required: bool = True, default_value: float = 0.0) -> ManagedAnalog:
403    def getAnalog(
404        self,
405        name: str,
406        group: str | None = None,
407        required: bool = True,
408        default_value: float = 0.0,
409    ) -> ManagedAnalog:
410        """Get a managed analog input for the named action.
411
412        Handles InputType.ANALOG.  Full shaping pipeline from config.
413
414        Args:
415            name: Action name — either qualified "group.name" or short
416                "name".  If no dot is present and group is None, the
417                "general" group is assumed (e.g. "speed" -> "general.speed").
418            group: Explicit group override (used when name has no dot).
419            required: If True, raise KeyError when not found/not bound.
420            default_value: Value returned if unbound.
421
422        Returns:
423            ManagedAnalog — callable returning shaped value.
424        """
425        qn = self._resolve_name(name, group)
426
427        if qn in self._analogs:
428            self._mark_in_use(self._analogs[qn])
429            return self._analogs[qn]
430
431        action = self._config.actions.get(qn)
432        binding = self._find_binding(qn)
433
434        if action is None:
435            if required:
436                raise KeyError(
437                    f"Action '{qn}' not found in config "
438                    f"({self._config_source})")
439            log.warning("Action '%s' not found, returning default", qn)
440            analog = ManagedAnalog(
441                None, lambda: default_value, default_value)
442            self._analogs[qn] = analog
443            return analog
444
445        if action.input_type == InputType.VIRTUAL_ANALOG:
446            if binding is None:
447                if not required:
448                    log.warning(
449                        "Action '%s' is VIRTUAL_ANALOG but not bound, "
450                        "returning default", qn)
451                    analog = ManagedAnalog(
452                        action, lambda: default_value, default_value)
453                    self._analogs[qn] = analog
454                    return analog
455                raise KeyError(
456                    f"VIRTUAL_ANALOG action '{qn}' exists but is not "
457                    f"bound to any input ({self._config_source})")
458            state, input_name = binding
459            button_fn = make_button_condition(state, input_name, action)
460            generator = VirtualAnalogGenerator(action, button_fn)
461            self._va_generators.append(generator)
462            nt_path = f"{_NT_BASE}/actions/{action.group}/{action.name}"
463            klass = make_analog_nt_class(nt_path, action)
464            analog = klass(action, generator.get_value, default_value)
465            analog._binding_info = format_binding_info(state, input_name)
466            self._analogs[qn] = analog
467            self._mark_in_use(analog)
468            return analog
469
470        if binding is None:
471            if required:
472                raise KeyError(
473                    f"Action '{qn}' exists but is not bound to any "
474                    f"input ({self._config_source})")
475            log.warning("Action '%s' not bound, returning default", qn)
476            analog = ManagedAnalog(
477                action, lambda: default_value, default_value)
478            self._analogs[qn] = analog
479            return analog
480
481        state, input_name = binding
482        accessor = make_axis_accessor(state, input_name)
483
484        # Create NT-enabled subclass
485        nt_path = f"{_NT_BASE}/actions/{action.group}/{action.name}"
486        klass = make_analog_nt_class(nt_path, action)
487        analog = klass(action, accessor, default_value)
488        analog._binding_info = format_binding_info(state, input_name)
489        self._analogs[qn] = analog
490        self._mark_in_use(analog)
491        return analog

Get a managed analog input for the named action.

Handles InputType.ANALOG. Full shaping pipeline from config.

Args: name: Action name — either qualified "group.name" or short "name". If no dot is present and group is None, the "general" group is assumed (e.g. "speed" -> "general.speed"). group: Explicit group override (used when name has no dot). required: If True, raise KeyError when not found/not bound. default_value: Value returned if unbound.

Returns: ManagedAnalog — callable returning shaped value.

def getAnalogRaw( self, name: str, group: str | None = None, required: bool = True, apply_invert: bool = False, apply_deadband: bool = False, apply_scale: bool = False) -> Callable[[], float]:
493    def getAnalogRaw(
494        self,
495        name: str,
496        group: str | None = None,
497        required: bool = True,
498        apply_invert: bool = False,
499        apply_deadband: bool = False,
500        apply_scale: bool = False,
501    ) -> Callable[[], float]:
502        """Get a callable returning a selectively-shaped axis value.
503
504        Only applies the requested transformations.  Does not create
505        a managed object — returns a plain callable.
506        Results are cached by (name, apply_invert, apply_deadband,
507        apply_scale) — same args return the same callable.
508
509        Args:
510            name: Action name — either qualified "group.name" or short
511                "name".  If no dot is present and group is None, the
512                "general" group is assumed.
513            group: Explicit group override (used when name has no dot).
514            required: If True, raise KeyError when unavailable.
515            apply_invert: Apply inversion from config.
516            apply_deadband: Apply deadband from config.
517            apply_scale: Apply scale from config.
518        """
519        qn = self._resolve_name(name, group)
520        cache_key = (qn, apply_invert, apply_deadband, apply_scale)
521
522        if cache_key in self._raw_analogs:
523            return self._raw_analogs[cache_key]
524
525        action = self._config.actions.get(qn)
526        binding = self._find_binding(qn)
527
528        if action is None or binding is None:
529            if required:
530                raise KeyError(
531                    f"Action '{qn}' not found or not bound "
532                    f"({self._config_source})")
533            log.warning("Action '%s' unavailable, returning 0.0", qn)
534            fn = lambda: 0.0
535            self._raw_analogs[cache_key] = fn
536            return fn
537
538        state, input_name = binding
539        raw_accessor = make_axis_accessor(state, input_name)
540
541        # Build a selective pipeline using SCALED mode
542        # (applies only the requested transformations)
543        pipeline = build_shaping_pipeline(
544            inversion=action.inversion if apply_invert else False,
545            deadband=action.deadband if apply_deadband else 0.0,
546            trigger_mode=EventTriggerMode.SCALED,
547            scale=action.scale if apply_scale else 1.0,
548            extra={},
549            action_name=qn,
550        )
551
552        def _selective():
553            return pipeline(raw_accessor())
554        self._raw_analogs[cache_key] = _selective
555        return _selective

Get a callable returning a selectively-shaped axis value.

Only applies the requested transformations. Does not create a managed object — returns a plain callable. Results are cached by (name, apply_invert, apply_deadband, apply_scale) — same args return the same callable.

Args: name: Action name — either qualified "group.name" or short "name". If no dot is present and group is None, the "general" group is assumed. group: Explicit group override (used when name has no dot). required: If True, raise KeyError when unavailable. apply_invert: Apply inversion from config. apply_deadband: Apply deadband from config. apply_scale: Apply scale from config.

def getRumbleControl( self, name: str, group: str | None = None, required: bool = True) -> ManagedRumble:
557    def getRumbleControl(
558        self,
559        name: str,
560        group: str | None = None,
561        required: bool = True,
562    ) -> ManagedRumble:
563        """Get a managed rumble output for the named action.
564
565        Args:
566            name: Action name — either qualified "group.name" or short
567                "name".  If no dot is present and group is None, the
568                "general" group is assumed.
569            group: Explicit group override (used when name has no dot).
570            required: If True, raise KeyError when unavailable.
571
572        Returns:
573            ManagedRumble with set(value, timeout) interface.
574        """
575        qn = self._resolve_name(name, group)
576
577        if qn in self._rumbles:
578            self._mark_in_use(self._rumbles[qn])
579            return self._rumbles[qn]
580
581        action = self._config.actions.get(qn)
582        binding = self._find_binding(qn)
583
584        if action is None:
585            if required:
586                raise KeyError(
587                    f"Action '{qn}' not found in config "
588                    f"({self._config_source})")
589            log.warning("Action '%s' not found, returning no-op rumble", qn)
590            rumble = ManagedRumble(None, lambda v: None)
591            self._rumbles[qn] = rumble
592            return rumble
593
594        if binding is None:
595            if required:
596                raise KeyError(
597                    f"Action '{qn}' exists but is not bound to any "
598                    f"input ({self._config_source})")
599            log.warning("Action '%s' not bound, returning no-op rumble", qn)
600            rumble = ManagedRumble(action, lambda v: None)
601            self._rumbles[qn] = rumble
602            return rumble
603
604        state, input_name = binding
605        setter = make_output_setter(state, input_name)
606
607        # Create NT-enabled subclass
608        nt_path = f"{_NT_BASE}/actions/{action.group}/{action.name}"
609        klass = make_rumble_nt_class(nt_path, action)
610        rumble = klass(action, setter)
611        rumble._binding_info = format_binding_info(state, input_name)
612        self._rumbles[qn] = rumble
613        self._mark_in_use(rumble)
614        return rumble

Get a managed rumble output for the named action.

Args: name: Action name — either qualified "group.name" or short "name". If no dot is present and group is None, the "general" group is assumed. group: Explicit group override (used when name has no dot). required: If True, raise KeyError when unavailable.

Returns: ManagedRumble with set(value, timeout) interface.

def getController(self, port: int) -> wpilib._wpilib.XboxController | None:
618    def getController(self, port: int) -> wpilib.XboxController | None:
619        """Get the raw XboxController for a given port (for telemetry/logging)."""
620        state = self._controllers.get(port)
621        return state.controller if state is not None else None

Get the raw XboxController for a given port (for telemetry/logging).

def remap(self, action_name: str, port: int, input_name: str) -> None:
673    def remap(self, action_name: str, port: int,
674              input_name: str) -> None:
675        """Swap the physical input for a named action.
676
677        Not implemented — reserved for future dynamic remapping.
678        """
679        raise NotImplementedError("Dynamic remapping not yet implemented")

Swap the physical input for a named action.

Not implemented — reserved for future dynamic remapping.

def get_factory() -> InputFactory:
66def get_factory() -> "InputFactory":
67    """Return the active InputFactory instance.
68
69    Allows any module (subsystems, commands, etc.) to fetch managed
70    inputs without having the factory passed through constructors::
71
72        from utils.input import get_factory
73        rumble = get_factory().getRumbleControl("feedback.rumble")
74
75    Raises RuntimeError if no factory has been created yet.
76    """
77    if _active_factory is None:
78        raise RuntimeError(
79            "InputFactory not initialized — create one in robotInit "
80            "before calling get_factory()")
81    return _active_factory

Return the active InputFactory instance.

Allows any module (subsystems, commands, etc.) to fetch managed inputs without having the factory passed through constructors::

from utils.input import get_factory
rumble = get_factory().getRumbleControl("feedback.rumble")

Raises RuntimeError if no factory has been created yet.

class ManagedAnalog(utils.input._nt_mapping.NtMappingMixin):
 54class ManagedAnalog(NtMappingMixin):
 55    """A managed analog input with full shaping pipeline.
 56
 57    Args:
 58        action: The ActionDefinition this analog represents (may be None).
 59        accessor: Callable returning the raw axis value each cycle.
 60        default_value: Value returned when no accessor is bound.
 61    """
 62
 63    _PARAM_TYPES: dict[str, type] = {
 64        "deadband": float,
 65        "inversion": bool,
 66        "scale": float,
 67        "slew_rate": float,
 68    }
 69
 70    def __init__(
 71        self,
 72        action: ActionDefinition | None,
 73        accessor: Callable[[], float],
 74        default_value: float = 0.0,
 75    ):
 76        self._action = action
 77        self._accessor = accessor
 78        self._default_value = default_value
 79
 80        # Mutable properties — initialized from action or defaults
 81        if action is not None:
 82            self._deadband = action.deadband
 83            self._inversion = action.inversion
 84            self._scale = action.scale
 85            self._slew_rate = action.slew_rate
 86            self._trigger_mode = action.trigger_mode
 87            self._extra = action.extra or {}
 88        else:
 89            self._deadband = 0.0
 90            self._inversion = False
 91            self._scale = 1.0
 92            self._slew_rate = 0.0
 93            self._trigger_mode = EventTriggerMode.RAW
 94            self._extra = {}
 95
 96        # Binding info — set by the factory after construction.
 97        # Format: "ControllerName.input_name (channel)"
 98        self._binding_info: str = "unbound"
 99
100        self._init_nt_mapping()
101
102        self._pipeline: Callable[[float], float] = lambda x: x
103        self._slew_limiter = None
104        self._rebuild_pipeline()
105        self._rebuild_slew_limiter()
106
107    # --- Properties with pipeline rebuild ---
108
109    @property
110    def deadband(self) -> float:
111        return self._deadband
112
113    @deadband.setter
114    def deadband(self, value: float) -> None:
115        if self._deadband != value:
116            self._deadband = value
117            self._rebuild_pipeline()
118
119    @property
120    def inversion(self) -> bool:
121        return self._inversion
122
123    @inversion.setter
124    def inversion(self, value: bool) -> None:
125        if self._inversion != value:
126            self._inversion = value
127            self._rebuild_pipeline()
128
129    @property
130    def scale(self) -> float:
131        return self._scale
132
133    @scale.setter
134    def scale(self, value: float) -> None:
135        if self._scale != value:
136            self._scale = value
137            self._rebuild_pipeline()
138
139    @property
140    def slew_rate(self) -> float:
141        """Max output change rate in units/sec.  0 = disabled.
142
143        Applies symmetrically (same limit up and down) unless
144        ``extra["negative_slew_rate"]`` is set on the action.
145        """
146        return self._slew_rate
147
148    @slew_rate.setter
149    def slew_rate(self, value: float) -> None:
150        if self._slew_rate != value:
151            self._slew_rate = value
152            self._rebuild_slew_limiter()
153
154    @property
155    def action(self) -> ActionDefinition | None:
156        return self._action
157
158    # --- Value access ---
159
160    def get(self) -> float:
161        """Return the shaped value through the full pipeline + slew limit."""
162        shaped = self._pipeline(self._accessor())
163        if self._slew_limiter is not None:
164            shaped = self._slew_limiter.calculate(shaped)
165        self._last_slew_output = shaped
166        return shaped
167
168    def getRaw(self) -> float:
169        """Return the unmodified raw HID value."""
170        return self._accessor()
171
172    def __call__(self) -> float:
173        """Alias for get() — makes ManagedAnalog a Callable[[], float]."""
174        return self.get()
175
176    # --- Internal ---
177
178    def __str__(self) -> str:
179        name = self._action.qualified_name if self._action else "unbound"
180        mode = self._trigger_mode.value
181        parts = [f"ManagedAnalog('{name}'"]
182        parts.append(f"bind={self._binding_info}")
183        parts.append(f"mode={mode}")
184        parts.append(f"pipeline={self._pipeline}")
185        if self._slew_rate > 0:
186            neg = self._extra.get("negative_slew_rate", -self._slew_rate)
187            parts.append(f"slew_rate={self._slew_rate}/{neg}")
188        parts_str = ", ".join(parts)
189        return f"{parts_str})"
190
191    def _rebuild_pipeline(self) -> None:
192        """Rebuild the shaping closure from current properties."""
193        self._pipeline = build_shaping_pipeline(
194            inversion=self._inversion,
195            deadband=self._deadband,
196            trigger_mode=self._trigger_mode,
197            scale=self._scale,
198            extra=self._extra,
199            action_name=(
200                self._action.qualified_name if self._action else ""),
201        )
202
203    def _rebuild_slew_limiter(self) -> None:
204        """Recreate the SlewRateLimiter when rate changes.
205
206        SlewRateLimiter rate limits are constructor params, so a new
207        instance is needed when the rate changes.  Setting slew_rate
208        to 0 disables the limiter.
209
210        The negative (decreasing) rate defaults to ``-slew_rate`` but
211        can be overridden via ``extra["negative_slew_rate"]`` in the
212        action's YAML config for asymmetric limiting.
213
214        Preserves the last output value so changing the rate at runtime
215        doesn't cause a transient glitch (the new limiter starts from
216        where the old one left off rather than resetting to 0).
217        """
218        if self._slew_rate <= 0:
219            self._slew_limiter = None
220            return
221        try:
222            from wpimath.filter import SlewRateLimiter
223            neg_rate = self._extra.get(
224                "negative_slew_rate", -self._slew_rate)
225            initial = getattr(self, '_last_slew_output', 0.0)
226            self._slew_limiter = SlewRateLimiter(
227                self._slew_rate, neg_rate, initial)
228        except ImportError:
229            # Graceful degradation in test environments
230            self._slew_limiter = None
231
232    def _rebind(self, accessor: Callable[[], float]) -> None:
233        """Swap the raw accessor for dynamic remapping."""
234        self._accessor = accessor

A managed analog input with full shaping pipeline.

Args: action: The ActionDefinition this analog represents (may be None). accessor: Callable returning the raw axis value each cycle. default_value: Value returned when no accessor is bound.

ManagedAnalog( action: utils.controller.ActionDefinition | None, accessor: Callable[[], float], default_value: float = 0.0)
 70    def __init__(
 71        self,
 72        action: ActionDefinition | None,
 73        accessor: Callable[[], float],
 74        default_value: float = 0.0,
 75    ):
 76        self._action = action
 77        self._accessor = accessor
 78        self._default_value = default_value
 79
 80        # Mutable properties — initialized from action or defaults
 81        if action is not None:
 82            self._deadband = action.deadband
 83            self._inversion = action.inversion
 84            self._scale = action.scale
 85            self._slew_rate = action.slew_rate
 86            self._trigger_mode = action.trigger_mode
 87            self._extra = action.extra or {}
 88        else:
 89            self._deadband = 0.0
 90            self._inversion = False
 91            self._scale = 1.0
 92            self._slew_rate = 0.0
 93            self._trigger_mode = EventTriggerMode.RAW
 94            self._extra = {}
 95
 96        # Binding info — set by the factory after construction.
 97        # Format: "ControllerName.input_name (channel)"
 98        self._binding_info: str = "unbound"
 99
100        self._init_nt_mapping()
101
102        self._pipeline: Callable[[float], float] = lambda x: x
103        self._slew_limiter = None
104        self._rebuild_pipeline()
105        self._rebuild_slew_limiter()
deadband: float
109    @property
110    def deadband(self) -> float:
111        return self._deadband
inversion: bool
119    @property
120    def inversion(self) -> bool:
121        return self._inversion
scale: float
129    @property
130    def scale(self) -> float:
131        return self._scale
slew_rate: float
139    @property
140    def slew_rate(self) -> float:
141        """Max output change rate in units/sec.  0 = disabled.
142
143        Applies symmetrically (same limit up and down) unless
144        ``extra["negative_slew_rate"]`` is set on the action.
145        """
146        return self._slew_rate

Max output change rate in units/sec. 0 = disabled.

Applies symmetrically (same limit up and down) unless extra["negative_slew_rate"] is set on the action.

action: utils.controller.ActionDefinition | None
154    @property
155    def action(self) -> ActionDefinition | None:
156        return self._action
def get(self) -> float:
160    def get(self) -> float:
161        """Return the shaped value through the full pipeline + slew limit."""
162        shaped = self._pipeline(self._accessor())
163        if self._slew_limiter is not None:
164            shaped = self._slew_limiter.calculate(shaped)
165        self._last_slew_output = shaped
166        return shaped

Return the shaped value through the full pipeline + slew limit.

def getRaw(self) -> float:
168    def getRaw(self) -> float:
169        """Return the unmodified raw HID value."""
170        return self._accessor()

Return the unmodified raw HID value.

class ManagedButton(utils.input._nt_mapping.NtMappingMixin):
 25class ManagedButton(NtMappingMixin):
 26    """A managed boolean input backed by a Trigger.
 27
 28    Args:
 29        action: The ActionDefinition this button represents (may be None
 30                for unbound/default buttons).
 31        condition: Callable returning the current boolean state.
 32        default_value: Value returned when no condition is bound.
 33    """
 34
 35    _PARAM_TYPES: dict[str, type] = {
 36        "threshold": float,
 37    }
 38
 39    def __init__(
 40        self,
 41        action: ActionDefinition | None,
 42        condition: Callable[[], bool],
 43        default_value: bool = False,
 44    ):
 45        self._action = action
 46        self._condition = condition
 47        self._default_value = default_value
 48        self._trigger = Trigger(condition)
 49
 50        # Mutable threshold ref for BOOLEAN_TRIGGER live-tuning.
 51        # The factory sets this to a shared list that the condition
 52        # closure also reads, so threshold changes take effect
 53        # without rebuilding the Trigger or losing command bindings.
 54        self._threshold_ref: list[float] | None = None
 55
 56        # Binding info — set by the factory after construction.
 57        # Format: "ControllerName.input_name (channel)"
 58        self._binding_info: str = "unbound"
 59
 60        self._init_nt_mapping()
 61
 62    # --- Threshold property (BOOLEAN_TRIGGER live-tuning) ---
 63
 64    @property
 65    def threshold(self) -> float:
 66        """Current threshold value for BOOLEAN_TRIGGER conditions."""
 67        if self._threshold_ref is not None:
 68            return self._threshold_ref[0]
 69        return self._action.threshold if self._action is not None else 0.5
 70
 71    @threshold.setter
 72    def threshold(self, value: float) -> None:
 73        if self._threshold_ref is not None:
 74            self._threshold_ref[0] = float(value)
 75
 76    # --- Mixin overrides for threshold-specific behavior ---
 77
 78    def _validate_param(self, param: str) -> str | None:
 79        """Reject threshold mapping on non-BOOLEAN_TRIGGER actions."""
 80        if (param == "threshold"
 81                and self._action is not None
 82                and self._action.input_type != InputType.BOOLEAN_TRIGGER):
 83            return (
 84                f"Cannot map 'threshold' on a non-BOOLEAN_TRIGGER action "
 85                f"('{self._action.qualified_name}' is "
 86                f"{self._action.input_type.value})")
 87        return None
 88
 89    def _get_param_value(self, param: str):
 90        """Read threshold from the property (which reads _threshold_ref)."""
 91        if param == "threshold":
 92            return self.threshold
 93        return getattr(self, param)
 94
 95    def _set_param_value(self, param: str, value) -> None:
 96        """Write threshold via the property and sync NT if available."""
 97        if param == "threshold":
 98            self.threshold = value
 99            if hasattr(self, 'nt_threshold'):
100                self.nt_threshold = value
101        else:
102            setattr(self, param, value)
103
104    # --- Drop-in Trigger binding methods ---
105
106    def onTrue(self, command: commands2.Command) -> "ManagedButton":
107        """Schedule command when condition becomes True."""
108        self._trigger.onTrue(command)
109        return self
110
111    def onFalse(self, command: commands2.Command) -> "ManagedButton":
112        """Schedule command when condition becomes False."""
113        self._trigger.onFalse(command)
114        return self
115
116    def whileTrue(self, command: commands2.Command) -> "ManagedButton":
117        """Schedule command while condition is True, cancel on False."""
118        self._trigger.whileTrue(command)
119        return self
120
121    def whileFalse(self, command: commands2.Command) -> "ManagedButton":
122        """Schedule command while condition is False, cancel on True."""
123        self._trigger.whileFalse(command)
124        return self
125
126    def toggleOnTrue(self, command: commands2.Command) -> "ManagedButton":
127        """Toggle command scheduling each time condition becomes True."""
128        self._trigger.toggleOnTrue(command)
129        return self
130
131    _BINDING_MAP = {
132        EventTriggerMode.ON_TRUE: "onTrue",
133        EventTriggerMode.ON_FALSE: "onFalse",
134        EventTriggerMode.WHILE_TRUE: "whileTrue",
135        EventTriggerMode.WHILE_FALSE: "whileFalse",
136        EventTriggerMode.TOGGLE_ON_TRUE: "toggleOnTrue",
137    }
138
139    def bind(
140        self,
141        command: commands2.Command,
142        mode: EventTriggerMode | None = None,
143    ) -> "ManagedButton":
144        """Bind a command using the configured or overridden trigger mode.
145
146        Args:
147            command: The command to bind.
148            mode: Override the YAML-configured trigger mode.  When None
149                (default), uses the action's configured mode.  Pass an
150                explicit EventTriggerMode to ignore the config, e.g.
151                ``btn.bind(cmd, mode=EventTriggerMode.WHILE_TRUE)``.
152
153        Falls back to onTrue when no action is set and no override given.
154        """
155        if mode is None:
156            mode = (self._action.trigger_mode
157                    if self._action is not None
158                    else EventTriggerMode.ON_TRUE)
159        method_name = self._BINDING_MAP.get(mode, "onTrue")
160        return getattr(self, method_name)(command)
161
162    # --- State access ---
163
164    def get(self) -> bool:
165        """Return the current boolean state."""
166        return self._condition()
167
168    @property
169    def trigger(self) -> Trigger:
170        """Access the underlying Trigger for advanced composition."""
171        return self._trigger
172
173    @property
174    def action(self) -> ActionDefinition | None:
175        """The action definition, if any."""
176        return self._action
177
178    def __str__(self) -> str:
179        name = self._action.qualified_name if self._action else "unbound"
180        parts = [f"ManagedButton('{name}'"]
181        parts.append(f"bind={self._binding_info}")
182        if self._action is not None:
183            parts.append(f"type={self._action.input_type.value}")
184            parts.append(f"mode={self._action.trigger_mode.value}")
185            if self._action.input_type == InputType.BOOLEAN_TRIGGER:
186                parts.append(f"threshold={self.threshold}")
187        parts_str = ", ".join(parts)
188        return f"{parts_str})"
189
190    # --- Remapping support ---
191
192    def _rebind(self, condition: Callable[[], bool]) -> None:
193        """Swap the condition and rebuild the internal Trigger.
194
195        Used by factory.remap() for dynamic remapping.
196        Note: existing command bindings on the old Trigger will no
197        longer fire — callers must re-register bindings after remap.
198        """
199        self._condition = condition
200        self._trigger = Trigger(condition)

A managed boolean input backed by a Trigger.

Args: action: The ActionDefinition this button represents (may be None for unbound/default buttons). condition: Callable returning the current boolean state. default_value: Value returned when no condition is bound.

ManagedButton( action: utils.controller.ActionDefinition | None, condition: Callable[[], bool], default_value: bool = False)
39    def __init__(
40        self,
41        action: ActionDefinition | None,
42        condition: Callable[[], bool],
43        default_value: bool = False,
44    ):
45        self._action = action
46        self._condition = condition
47        self._default_value = default_value
48        self._trigger = Trigger(condition)
49
50        # Mutable threshold ref for BOOLEAN_TRIGGER live-tuning.
51        # The factory sets this to a shared list that the condition
52        # closure also reads, so threshold changes take effect
53        # without rebuilding the Trigger or losing command bindings.
54        self._threshold_ref: list[float] | None = None
55
56        # Binding info — set by the factory after construction.
57        # Format: "ControllerName.input_name (channel)"
58        self._binding_info: str = "unbound"
59
60        self._init_nt_mapping()
threshold: float
64    @property
65    def threshold(self) -> float:
66        """Current threshold value for BOOLEAN_TRIGGER conditions."""
67        if self._threshold_ref is not None:
68            return self._threshold_ref[0]
69        return self._action.threshold if self._action is not None else 0.5

Current threshold value for BOOLEAN_TRIGGER conditions.

def onTrue( self, command: commands2.command.Command) -> ManagedButton:
106    def onTrue(self, command: commands2.Command) -> "ManagedButton":
107        """Schedule command when condition becomes True."""
108        self._trigger.onTrue(command)
109        return self

Schedule command when condition becomes True.

def onFalse( self, command: commands2.command.Command) -> ManagedButton:
111    def onFalse(self, command: commands2.Command) -> "ManagedButton":
112        """Schedule command when condition becomes False."""
113        self._trigger.onFalse(command)
114        return self

Schedule command when condition becomes False.

def whileTrue( self, command: commands2.command.Command) -> ManagedButton:
116    def whileTrue(self, command: commands2.Command) -> "ManagedButton":
117        """Schedule command while condition is True, cancel on False."""
118        self._trigger.whileTrue(command)
119        return self

Schedule command while condition is True, cancel on False.

def whileFalse( self, command: commands2.command.Command) -> ManagedButton:
121    def whileFalse(self, command: commands2.Command) -> "ManagedButton":
122        """Schedule command while condition is False, cancel on True."""
123        self._trigger.whileFalse(command)
124        return self

Schedule command while condition is False, cancel on True.

def toggleOnTrue( self, command: commands2.command.Command) -> ManagedButton:
126    def toggleOnTrue(self, command: commands2.Command) -> "ManagedButton":
127        """Toggle command scheduling each time condition becomes True."""
128        self._trigger.toggleOnTrue(command)
129        return self

Toggle command scheduling each time condition becomes True.

def bind( self, command: commands2.command.Command, mode: utils.controller.EventTriggerMode | None = None) -> ManagedButton:
139    def bind(
140        self,
141        command: commands2.Command,
142        mode: EventTriggerMode | None = None,
143    ) -> "ManagedButton":
144        """Bind a command using the configured or overridden trigger mode.
145
146        Args:
147            command: The command to bind.
148            mode: Override the YAML-configured trigger mode.  When None
149                (default), uses the action's configured mode.  Pass an
150                explicit EventTriggerMode to ignore the config, e.g.
151                ``btn.bind(cmd, mode=EventTriggerMode.WHILE_TRUE)``.
152
153        Falls back to onTrue when no action is set and no override given.
154        """
155        if mode is None:
156            mode = (self._action.trigger_mode
157                    if self._action is not None
158                    else EventTriggerMode.ON_TRUE)
159        method_name = self._BINDING_MAP.get(mode, "onTrue")
160        return getattr(self, method_name)(command)

Bind a command using the configured or overridden trigger mode.

Args: command: The command to bind. mode: Override the YAML-configured trigger mode. When None (default), uses the action's configured mode. Pass an explicit EventTriggerMode to ignore the config, e.g. btn.bind(cmd, mode=EventTriggerMode.WHILE_TRUE).

Falls back to onTrue when no action is set and no override given.

def get(self) -> bool:
164    def get(self) -> bool:
165        """Return the current boolean state."""
166        return self._condition()

Return the current boolean state.

trigger: commands2.button.trigger.Trigger
168    @property
169    def trigger(self) -> Trigger:
170        """Access the underlying Trigger for advanced composition."""
171        return self._trigger

Access the underlying Trigger for advanced composition.

action: utils.controller.ActionDefinition | None
173    @property
174    def action(self) -> ActionDefinition | None:
175        """The action definition, if any."""
176        return self._action

The action definition, if any.

class ManagedRumble:
14class ManagedRumble:
15    """A managed rumble output channel.
16
17    Args:
18        action: The ActionDefinition this output represents (may be None).
19        setter: Callable that sets the rumble intensity (0.0 to 1.0).
20    """
21
22    def __init__(
23        self,
24        action: ActionDefinition | None,
25        setter: Callable[[float], None],
26    ):
27        self._action = action
28        self._setter = setter
29        self._stop_time: float | None = None
30        self._current_value: float = 0.0
31
32        # Binding info — set by the factory after construction.
33        # Format: "ControllerName.input_name (channel)"
34        self._binding_info: str = "unbound"
35
36    @property
37    def action(self) -> ActionDefinition | None:
38        return self._action
39
40    def set(self, value: float, timeout: float = 0.0) -> None:
41        """Set rumble intensity.
42
43        Args:
44            value: Intensity from 0.0 to 1.0.
45            timeout: If > 0, auto-stop after this many seconds.
46        """
47        self._current_value = value
48        self._setter(value)
49        if timeout > 0:
50            self._stop_time = time.monotonic() + timeout
51        else:
52            self._stop_time = None
53
54    def stop(self) -> None:
55        """Stop the rumble immediately."""
56        self._current_value = 0.0
57        self._setter(0.0)
58        self._stop_time = None
59
60    def update(self) -> None:
61        """Check timeouts — call once per robot cycle."""
62        if (self._stop_time is not None
63                and time.monotonic() >= self._stop_time):
64            self.stop()
65
66    def __str__(self) -> str:
67        name = self._action.qualified_name if self._action else "unbound"
68        return f"ManagedRumble('{name}', bind={self._binding_info})"
69
70    def _rebind(self, setter: Callable[[float], None]) -> None:
71        """Swap the output setter for dynamic remapping."""
72        self._setter = setter

A managed rumble output channel.

Args: action: The ActionDefinition this output represents (may be None). setter: Callable that sets the rumble intensity (0.0 to 1.0).

ManagedRumble( action: utils.controller.ActionDefinition | None, setter: Callable[[float], NoneType])
22    def __init__(
23        self,
24        action: ActionDefinition | None,
25        setter: Callable[[float], None],
26    ):
27        self._action = action
28        self._setter = setter
29        self._stop_time: float | None = None
30        self._current_value: float = 0.0
31
32        # Binding info — set by the factory after construction.
33        # Format: "ControllerName.input_name (channel)"
34        self._binding_info: str = "unbound"
action: utils.controller.ActionDefinition | None
36    @property
37    def action(self) -> ActionDefinition | None:
38        return self._action
def set(self, value: float, timeout: float = 0.0) -> None:
40    def set(self, value: float, timeout: float = 0.0) -> None:
41        """Set rumble intensity.
42
43        Args:
44            value: Intensity from 0.0 to 1.0.
45            timeout: If > 0, auto-stop after this many seconds.
46        """
47        self._current_value = value
48        self._setter(value)
49        if timeout > 0:
50            self._stop_time = time.monotonic() + timeout
51        else:
52            self._stop_time = None

Set rumble intensity.

Args: value: Intensity from 0.0 to 1.0. timeout: If > 0, auto-stop after this many seconds.

def stop(self) -> None:
54    def stop(self) -> None:
55        """Stop the rumble immediately."""
56        self._current_value = 0.0
57        self._setter(0.0)
58        self._stop_time = None

Stop the rumble immediately.

def update(self) -> None:
60    def update(self) -> None:
61        """Check timeouts — call once per robot cycle."""
62        if (self._stop_time is not None
63                and time.monotonic() >= self._stop_time):
64            self.stop()

Check timeouts — call once per robot cycle.

@dataclass
class ValidationIssue:
21@dataclass
22class ValidationIssue:
23    """A single validation finding."""
24    level: str       # "error" or "warning"
25    message: str
26    context: str = ""
27
28    def __str__(self) -> str:
29        prefix = f"[{self.level.upper()}]"
30        if self.context:
31            return f"{prefix} {self.context}: {self.message}"
32        return f"{prefix} {self.message}"

A single validation finding.

ValidationIssue(level: str, message: str, context: str = '')
level: str
message: str
context: str = ''
def validate_config( config: utils.controller.FullConfig) -> list[ValidationIssue]:
142def validate_config(config: FullConfig) -> list[ValidationIssue]:
143    """Validate a full controller configuration.
144
145    Returns a list of issues (errors and warnings).
146    An empty list means the config is valid.
147    """
148    issues: list[ValidationIssue] = []
149
150    # Validate each action definition
151    for action in config.actions.values():
152        issues.extend(_validate_action(action))
153
154    # Validate controller bindings
155    for port, ctrl in config.controllers.items():
156        ctrl_ctx = f"controller {port} ({ctrl.name or 'unnamed'})"
157
158        for input_name, action_names in ctrl.bindings.items():
159            # Check input name is recognized
160            if input_name not in ALL_INPUT_NAMES:
161                issues.append(ValidationIssue(
162                    "error",
163                    f"Unknown input name '{input_name}'",
164                    ctrl_ctx))
165
166            category = get_input_category(input_name)
167
168            for action_name in action_names:
169                # Check action exists
170                if action_name not in config.actions:
171                    issues.append(ValidationIssue(
172                        "error",
173                        f"Binding references unknown action "
174                        f"'{action_name}'",
175                        f"{ctrl_ctx} / {input_name}"))
176                    continue
177
178                action = config.actions[action_name]
179
180                # Type consistency: action input_type vs physical input
181                if (action.input_type == InputType.ANALOG
182                        and category == "button"):
183                    issues.append(ValidationIssue(
184                        "warning",
185                        f"Analog action '{action_name}' bound to "
186                        f"button input '{input_name}'",
187                        ctrl_ctx))
188
189                if (action.input_type == InputType.BUTTON
190                        and category == "axis"):
191                    issues.append(ValidationIssue(
192                        "warning",
193                        f"Button action '{action_name}' bound to "
194                        f"axis input '{input_name}'",
195                        ctrl_ctx))
196
197                if (action.input_type == InputType.BOOLEAN_TRIGGER
198                        and category != "axis"):
199                    issues.append(ValidationIssue(
200                        "error",
201                        f"BOOLEAN_TRIGGER action '{action_name}' must "
202                        f"be bound to an axis input, got '{input_name}'",
203                        ctrl_ctx))
204
205                if (action.input_type == InputType.VIRTUAL_ANALOG
206                        and category not in ("button", "pov")):
207                    issues.append(ValidationIssue(
208                        "error",
209                        f"VIRTUAL_ANALOG action '{action_name}' must "
210                        f"be bound to a button/POV input, "
211                        f"got '{input_name}'",
212                        ctrl_ctx))
213
214                if (action.input_type == InputType.OUTPUT
215                        and category != "output"):
216                    issues.append(ValidationIssue(
217                        "warning",
218                        f"Output action '{action_name}' bound to "
219                        f"non-output input '{input_name}'",
220                        ctrl_ctx))
221
222    # Warn about empty action definitions
223    for action in config.actions.values():
224        if (not action.description
225                and action.input_type == InputType.BUTTON
226                and action.trigger_mode == EventTriggerMode.ON_TRUE
227                and action.deadband == 0.0
228                and action.scale == 1.0
229                and not action.extra):
230            issues.append(ValidationIssue(
231                "warning",
232                "Action has all default values (possibly empty definition)",
233                action.qualified_name))
234
235    return issues

Validate a full controller configuration.

Returns a list of issues (errors and warnings). An empty list means the config is valid.