Skip to content

CLI Module API

This page documents the Python API for the CLI module. For user-facing CLI documentation, see the CLI Reference.

Main Entry Point

damiao_motor.cli.unified_main

unified_main() -> None

Unified CLI entry point with subcommands.

Main entry point for the damiao command-line tool. Provides a unified interface for scanning, configuring, and controlling DaMiao motors over CAN bus.

Available commands
  • scan: Scan for connected motors
  • send-cmd: Send command to motor (all control modes)
  • set-zero-command: Send zero command continuously
  • set-zero-position: Set current position to zero
  • set-can-timeout: Set CAN timeout alarm time
  • set-motor-id: Change motor receive ID
  • set-feedback-id: Change motor feedback ID
  • gui: Launch web-based GUI for motor control

Global options (available for all commands): - --version: Show version number and exit - --channel: CAN channel (default: can0) - --bustype: CAN bus type (default: socketcan) - --bitrate: CAN bitrate in bits per second (default: 1000000)

Examples:

# Scan for motors
damiao scan

# Send command in MIT mode
damiao send-cmd --id 1 --mode MIT --position 1.5 --velocity 0.0 --stiffness 3.0 --stiffness 0.5

# Set current position to zero
damiao set-zero-position --id 1
Source code in damiao_motor/cli.py
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
def unified_main() -> None:
    """
    Unified CLI entry point with subcommands.

    Main entry point for the `damiao` command-line tool. Provides a unified interface
    for scanning, configuring, and controlling DaMiao motors over CAN bus.

    Available commands:
        - scan: Scan for connected motors
        - send-cmd: Send command to motor (all control modes)
        - set-zero-command: Send zero command continuously
        - set-zero-position: Set current position to zero
        - set-can-timeout: Set CAN timeout alarm time
        - set-motor-id: Change motor receive ID
        - set-feedback-id: Change motor feedback ID
        - gui: Launch web-based GUI for motor control

    Global options (available for all commands):
        - --version: Show version number and exit
        - --channel: CAN channel (default: can0)
        - --bustype: CAN bus type (default: socketcan)
        - --bitrate: CAN bitrate in bits per second (default: 1000000)

    Examples:
        ```bash
        # Scan for motors
        damiao scan

        # Send command in MIT mode
        damiao send-cmd --id 1 --mode MIT --position 1.5 --velocity 0.0 --stiffness 3.0 --stiffness 0.5

        # Set current position to zero
        damiao set-zero-position --id 1
        ```
    """
    parser = argparse.ArgumentParser(
        description="DaMiao Motor CLI Tool - Control and configure DaMiao motors over CAN bus",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Scan for motors on default CAN channel (can0)
  damiao scan

  # Scan specific motor IDs with debug output
  damiao scan --ids 1 2 3 --debug

  # Send command in MIT mode
  damiao send-cmd --id 1 --mode MIT --position 1.5 --velocity 0.0 --stiffness 3.0 --damping 0.5

  # Send command in VEL mode
  damiao send-cmd --id 1 --mode VEL --velocity 3.0

  # Set current position to zero
  damiao set-zero-position --id 1

  # Set CAN timeout
  damiao set-can-timeout --id 1 --timeout 1000

  # Launch web GUI
  damiao gui
  damiao gui --port 8080

  # Use different CAN channel (can be before or after command)
  damiao scan --channel can_leader_l
  damiao send-cmd --id 1 --mode MIT --channel can_leader_l

  # Show version
  damiao --version

For more information about a specific command, use:
  damiao <command> --help
        """,
    )

    # Global arguments
    parser.add_argument(
        "--version",
        action="version",
        version=f"{__version__}",
        help="Show version number",
    )
    parser.add_argument(
        "--channel",
        type=str,
        default="can0",
        help="CAN channel (default: can0)",
    )
    parser.add_argument(
        "--bustype",
        type=str,
        default="socketcan",
        help="CAN bus type (default: socketcan)",
    )
    parser.add_argument(
        "--bitrate",
        type=int,
        default=1000000,
        help="CAN bitrate in bits per second (default: 1000000). Only used when bringing up interface.",
    )

    subparsers = parser.add_subparsers(
        dest="command",
        help="Available commands",
        required=True,
        metavar="COMMAND",
        title="Commands",
        description="Use 'damiao <command> --help' for more information about a specific command."
    )

    # Helper function to add global arguments to subcommands
    def add_global_args(subparser):
        """Add global arguments to a subcommand parser."""
        subparser.add_argument(
            "--channel",
            type=str,
            default="can0",
            help="CAN channel (default: can0)",
        )
        subparser.add_argument(
            "--bustype",
            type=str,
            default="socketcan",
            help="CAN bus type (default: socketcan)",
        )
        subparser.add_argument(
            "--bitrate",
            type=int,
            default=1000000,
            help="CAN bitrate in bits per second (default: 1000000). Only used when bringing up interface.",
        )
        subparser.add_argument(
            "--motor-type",
            type=str,
            required=True,
            choices=["4310", "4310P", "4340", "4340P", "6006", "8006", "8009", "10010L", "10010", "H3510", "G6215", "H6220", "JH11", "6248P", "3507"],
            dest="motor_type",
            help="Motor type for P/V/T presets (e.g. 4340, 4310, 3507)",
        )

    # scan command
    scan_parser = subparsers.add_parser(
        "scan",
        help="Scan for connected motors on CAN bus",
        description="Scan for connected DaMiao motors by sending zero commands and listening for responses.",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Scan default ID range (0x01-0x10)
  damiao scan --motor-type 4340

  # Scan specific motor IDs
  damiao scan --motor-type 4340 --ids 1 2 3

  # Scan with longer listen duration
  damiao scan --motor-type 4340 --duration 2.0

  # Scan with motor type 4310
  damiao scan --motor-type 4310

  # Scan with debug output (print all raw CAN messages)
  damiao scan --motor-type 4340 --debug
        """
    )
    scan_parser.add_argument(
        "--ids",
        type=int,
        nargs="+",
        metavar="ID",
        help="Motor IDs to test (e.g., --ids 1 2 3). If not specified, tests IDs 0x01-0x10.",
    )
    scan_parser.add_argument(
        "--duration",
        type=float,
        default=0.5,
        help="Duration to listen for responses in seconds (default: 0.5)",
    )
    scan_parser.add_argument(
        "--debug",
        action="store_true",
        help="Print all raw CAN messages for debugging.",
    )
    add_global_args(scan_parser)
    scan_parser.set_defaults(func=cmd_scan)

    # set-zero-command (renamed from set-zero)
    zero_parser = subparsers.add_parser(
        "set-zero-command",
        help="Send zero command to a motor",
        description="Send a zero command continuously to a motor.",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Send zero command continuously (loops until Ctrl+C)
  damiao set-zero-command --id 1

  # With custom frequency
  damiao set-zero-command --id 1 --frequency 50.0
        """
    )
    zero_parser.add_argument(
        "--id",
        type=int,
        required=True,
        dest="motor_id",
        help="Motor ID to send zero command to",
    )
    zero_parser.add_argument(
        "--frequency",
        type=float,
        default=100.0,
        help="Command frequency in Hz (default: 100.0)",
    )
    add_global_args(zero_parser)
    zero_parser.set_defaults(func=cmd_set_zero)

    # set-zero-position command
    zero_pos_parser = subparsers.add_parser(
        "set-zero-position",
        help="Set current position to zero",
        description="Set the current output shaft position to zero (save position zero).",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Set current position to zero
  damiao set-zero-position --id 1
        """
    )
    zero_pos_parser.add_argument(
        "--id",
        type=int,
        required=True,
        dest="motor_id",
        help="Motor ID",
    )
    add_global_args(zero_pos_parser)
    zero_pos_parser.set_defaults(func=cmd_set_zero_position)

    # set-can-timeout command
    timeout_parser = subparsers.add_parser(
        "set-can-timeout",
        help="Set CAN timeout alarm time (register 9)",
        description="Set the CAN timeout alarm time in milliseconds. Register 9 uses units of 50 microseconds (1 unit = 50us).",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Set CAN timeout to 1000 ms
  damiao set-can-timeout --id 1 --timeout 1000
        """
    )
    timeout_parser.add_argument(
        "--id",
        type=int,
        required=True,
        dest="motor_id",
        help="Motor ID",
    )
    timeout_parser.add_argument(
        "--timeout",
        type=int,
        required=True,
        dest="timeout_ms",
        help="Timeout in milliseconds (ms)",
    )
    add_global_args(timeout_parser)
    timeout_parser.set_defaults(func=cmd_set_can_timeout)

    # set-motor-id command
    set_motor_id_parser = subparsers.add_parser(
        "set-motor-id",
        help="Set motor receive ID (register 8)",
        description="Change the motor's receive ID (ESC_ID, register 8). This is the ID used to send commands to the motor.",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Change motor ID from 1 to 2
  damiao set-motor-id --current 1 --target 2

Note: After changing the motor ID, you will need to use the new ID to communicate with the motor.
        """
    )
    set_motor_id_parser.add_argument(
        "--current",
        type=int,
        required=True,
        help="Current motor ID (to connect to the motor)",
    )
    set_motor_id_parser.add_argument(
        "--target",
        type=int,
        required=True,
        help="Target motor ID (new receive ID)",
    )
    add_global_args(set_motor_id_parser)
    set_motor_id_parser.set_defaults(func=cmd_set_motor_id)

    # set-feedback-id command
    set_feedback_id_parser = subparsers.add_parser(
        "set-feedback-id",
        help="Set motor feedback ID (register 7)",
        description="Change the motor's feedback ID (MST_ID, register 7). This is the ID used to identify feedback messages from the motor.",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Change feedback ID to 3 (using motor ID 1 to connect)
  damiao set-feedback-id --current 1 --target 3

Note: The motor will now respond with feedback using the new feedback ID.
        """
    )
    set_feedback_id_parser.add_argument(
        "--current",
        type=int,
        required=True,
        help="Current motor ID (to connect to the motor)",
    )
    set_feedback_id_parser.add_argument(
        "--target",
        type=int,
        required=True,
        help="Target feedback ID (new MST_ID)",
    )
    add_global_args(set_feedback_id_parser)
    set_feedback_id_parser.set_defaults(func=cmd_set_feedback_id)

    # send-cmd command (unified command for all modes)
    send_cmd_parser = subparsers.add_parser(
        "send-cmd",
        help="Send command to motor (unified command for all control modes)",
        description="Send command to motor with specified control mode. Loops continuously until Ctrl+C.",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # MIT mode (default)
  damiao send-cmd --id 1 --mode MIT --position 1.5 --velocity 0.0 --stiffness 3.0 --damping 0.5

  # POS_VEL mode
  damiao send-cmd --id 1 --mode POS_VEL --position 1.5 --velocity 2.0

  # VEL mode
  damiao send-cmd --id 1 --mode VEL --velocity 3.0

  # FORCE_POS mode
  damiao send-cmd --id 1 --mode FORCE_POS --position 1.5 --velocity-limit 50.0 --current-limit 0.8

  # With custom frequency
  damiao send-cmd --id 1 --mode MIT --position 1.5 --frequency 50.0
        """
    )
    send_cmd_parser.add_argument(
        "--id",
        type=int,
        required=True,
        dest="motor_id",
        help="Motor ID",
    )
    send_cmd_parser.add_argument(
        "--mode",
        type=str,
        default="MIT",
        choices=["MIT", "POS_VEL", "VEL", "FORCE_POS"],
        help="Control mode (default: MIT)",
    )
    send_cmd_parser.add_argument(
        "--position",
        type=float,
        default=0.0,
        help="Desired position (radians). Required for MIT, POS_VEL, FORCE_POS modes.",
    )
    send_cmd_parser.add_argument(
        "--velocity",
        type=float,
        default=0.0,
        help="Desired velocity (rad/s). Required for MIT, POS_VEL, VEL modes.",
    )
    send_cmd_parser.add_argument(
        "--stiffness",
        type=float,
        default=0.0,
        dest="stiffness",
        help="Stiffness (kp) for MIT mode, range 0–500 (default: 0.0)",
    )
    send_cmd_parser.add_argument(
        "--damping",
        type=float,
        default=0.0,
        dest="damping",
        help="Damping (kd) for MIT mode, range 0–5 (default: 0.0)",
    )
    send_cmd_parser.add_argument(
        "--feedforward-torque",
        type=float,
        default=0.0,
        dest="feedforward_torque",
        help="Feedforward torque for MIT mode (default: 0.0)",
    )
    send_cmd_parser.add_argument(
        "--velocity-limit",
        type=float,
        default=0.0,
        dest="velocity_limit",
        help="Velocity limit (rad/s, 0-100) for FORCE_POS mode",
    )
    send_cmd_parser.add_argument(
        "--current-limit",
        type=float,
        default=0.0,
        dest="current_limit",
        help="Torque current limit normalized (0.0-1.0) for FORCE_POS mode",
    )
    send_cmd_parser.add_argument(
        "--frequency",
        type=float,
        default=100.0,
        help="Command frequency in Hz (default: 100.0)",
    )
    add_global_args(send_cmd_parser)
    send_cmd_parser.set_defaults(func=cmd_send_cmd)

    # gui command
    gui_parser = subparsers.add_parser(
        "gui",
        help="Launch web-based GUI for motor control",
        description="Launch the web-based GUI for viewing and controlling DaMiao motors.",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Start GUI on default host and port (http://127.0.0.1:5000)
  damiao gui

  # Start GUI on custom port
  damiao gui --port 8080

  # Start GUI on all interfaces
  damiao gui --host 0.0.0.0

  # Start GUI with production server (requires waitress)
  damiao gui --production

  # Start GUI with debug mode
  damiao gui --debug
        """
    )
    gui_parser.add_argument(
        "--host",
        type=str,
        default="127.0.0.1",
        help="Host to bind to (default: 127.0.0.1)",
    )
    gui_parser.add_argument(
        "--port",
        type=int,
        default=5000,
        help="Port to bind to (default: 5000)",
    )
    gui_parser.add_argument(
        "--debug",
        action="store_true",
        help="Enable debug mode",
    )
    gui_parser.add_argument(
        "--production",
        action="store_true",
        help="Use production WSGI server (requires waitress)",
    )
    gui_parser.set_defaults(func=cmd_gui)

    args = parser.parse_args()

    # Execute the appropriate command
    try:
        args.func(args)
    except KeyboardInterrupt:
        print("\n\nInterrupted by user.")
        sys.exit(1)
    except Exception as e:
        print(f"\n\nError: {e}")
        sys.exit(1)

Command Handlers

damiao_motor.cli.cmd_scan

cmd_scan(args) -> None

Handle 'scan' subcommand.

Scans for connected motors on the CAN bus by sending zero commands and listening for feedback.

Parameters:

Name Type Description Default
args

Parsed command-line arguments containing: - channel: CAN channel (default: can0) - bustype: CAN bus type (default: socketcan) - ids: Optional list of motor IDs to test (default: 0x01-0x10) - duration: Duration to listen for responses in seconds (default: 0.5) - bitrate: CAN bitrate in bits per second (default: 1000000) - debug: Print all raw CAN messages for debugging (default: False)

required

Examples:

# Scan default ID range (0x01-0x10)
damiao scan

# Scan specific motor IDs
damiao scan --ids 1 2 3

# Scan with longer listen duration
damiao scan --duration 2.0

# Scan with debug output
damiao scan --debug
Source code in damiao_motor/cli.py
def cmd_scan(args) -> None:
    """
    Handle 'scan' subcommand.

    Scans for connected motors on the CAN bus by sending zero commands and listening for feedback.

    Args:
        args: Parsed command-line arguments containing:
            - channel: CAN channel (default: can0)
            - bustype: CAN bus type (default: socketcan)
            - ids: Optional list of motor IDs to test (default: 0x01-0x10)
            - duration: Duration to listen for responses in seconds (default: 0.5)
            - bitrate: CAN bitrate in bits per second (default: 1000000)
            - debug: Print all raw CAN messages for debugging (default: False)

    Examples:
        ```bash
        # Scan default ID range (0x01-0x10)
        damiao scan

        # Scan specific motor IDs
        damiao scan --ids 1 2 3

        # Scan with longer listen duration
        damiao scan --duration 2.0

        # Scan with debug output
        damiao scan --debug
        ```
    """
    # Print header and configuration in a single box
    print()
    top_border = f"{BOX_CORNER_TL}{BOX_HORIZONTAL * 78}{BOX_CORNER_TR}"
    print(top_border)
    # Header line
    header_text = f" {GREEN}DaMiao Motor Scanner{RESET}"
    print(f"{BOX_VERTICAL}{pad_with_ansi(header_text, 78)}{BOX_VERTICAL}")
    # Separator line
    print(f"{BOX_JOIN_LEFT}{BOX_HORIZONTAL * 78}{BOX_JOIN_RIGHT}")
    # Configuration lines
    config_lines = [
        f" CAN channel: {args.channel}",
        f" Bus type: {args.bustype}",
        f" Motor type: {args.motor_type}",
        f" Testing motor IDs: {', '.join([hex(i) for i in args.ids]) if args.ids else '0x01-0x10 (default range)'}",
        f" Listen duration: {args.duration}s",
    ]
    if args.debug:
        config_lines.append(" Debug mode: ENABLED (printing all raw CAN messages)")

    for line in config_lines:
        print(f"{BOX_VERTICAL}{pad_with_ansi(line, 78)}{BOX_VERTICAL}")
    bottom_border = f"{BOX_CORNER_BL}{BOX_HORIZONTAL * 78}{BOX_CORNER_BR}"
    print(bottom_border)
    print()

    try:
        responded = scan_motors(
            channel=args.channel,
            bustype=args.bustype,
            motor_ids=args.ids,
            duration_s=args.duration,
            bitrate=args.bitrate,
            debug=args.debug,
            motor_type=args.motor_type,
        )

        # Print final summary
        print()
        if responded:
            # Combined scan summary box
            top_border = f"{BOX_CORNER_TL}{BOX_HORIZONTAL * 78}{BOX_CORNER_TR}"
            print(top_border)
            # Header line
            header_text = f" {GREEN}Scan Summary{RESET}"
            print(f"{BOX_VERTICAL}{pad_with_ansi(header_text, 78)}{BOX_VERTICAL}")
            # Separator line
            print(f"{BOX_JOIN_LEFT}{BOX_HORIZONTAL * 78}{BOX_JOIN_RIGHT}")
            # Summary lines
            summary_lines = [
                f" Found {len(responded)} motor(s):"
            ]
            for motor_id in sorted(responded):
                summary_lines.append(f"   • Motor ID: 0x{motor_id:02X} ({motor_id})")
            for line in summary_lines:
                print(f"{BOX_VERTICAL}{pad_with_ansi(line, 78)}{BOX_VERTICAL}")
            bottom_border = f"{BOX_CORNER_BL}{BOX_HORIZONTAL * 78}{BOX_CORNER_BR}"
            print(bottom_border)
        else:
            summary_lines = [
                "No motors responded.",
                "",
                "Check:",
                "  • CAN interface is up (e.g., sudo ip link set can0 up type can bitrate 1000000)",
                "  • Motors are powered and connected",
                "  • Motor IDs match the tested range",
            ]
            print_warning_box("Scan Summary - No Motors Found", summary_lines, width=80)

    except KeyboardInterrupt:
        print("\n\nInterrupted by user.")
    except Exception as e:
        print(f"\n\nError: {e}")
        raise

damiao_motor.cli.cmd_send_cmd

cmd_send_cmd(args) -> None

Handle unified 'send-cmd' subcommand.

Sends command to motor with specified control mode. Loops continuously until Ctrl+C. Supports MIT, POS_VEL, VEL, and FORCE_POS control modes.

Parameters:

Name Type Description Default
args

Parsed command-line arguments containing: - motor_id: Motor ID (required) - mode: Control mode - "MIT", "POS_VEL", "VEL", or "FORCE_POS" (default: MIT) - position: Desired position (radians) - for MIT, POS_VEL, FORCE_POS modes - velocity: Desired velocity (rad/s) - for MIT, POS_VEL, VEL modes - stiffness: Stiffness (kp) for MIT mode (default: 0.0) - damping: Damping (kd) for MIT mode (default: 0.0) - feedforward_torque: Feedforward torque for MIT mode (default: 0.0) - velocity_limit: Velocity limit (rad/s, 0-100) for FORCE_POS mode - current_limit: Torque current limit normalized (0.0-1.0) for FORCE_POS mode - frequency: Command frequency in Hz (default: 100.0) - channel: CAN channel (default: can0) - bustype: CAN bus type (default: socketcan) - bitrate: CAN bitrate in bits per second (default: 1000000)

required

Examples:

# MIT mode (default)
damiao send-cmd --id 1 --mode MIT --position 1.5 --velocity 0.0 --stiffness 3.0 --damping 0.5

# POS_VEL mode
damiao send-cmd --id 1 --mode POS_VEL --position 1.5 --velocity 2.0

# VEL mode
damiao send-cmd --id 1 --mode VEL --velocity 3.0

# FORCE_POS mode
damiao send-cmd --id 1 --mode FORCE_POS --position 1.5 --velocity-limit 50.0 --current-limit 0.8
Source code in damiao_motor/cli.py
def cmd_send_cmd(args) -> None:
    """
    Handle unified 'send-cmd' subcommand.

    Sends command to motor with specified control mode. Loops continuously until Ctrl+C.
    Supports MIT, POS_VEL, VEL, and FORCE_POS control modes.

    Args:
        args: Parsed command-line arguments containing:
            - motor_id: Motor ID (required)
            - mode: Control mode - "MIT", "POS_VEL", "VEL", or "FORCE_POS" (default: MIT)
            - position: Desired position (radians) - for MIT, POS_VEL, FORCE_POS modes
            - velocity: Desired velocity (rad/s) - for MIT, POS_VEL, VEL modes
            - stiffness: Stiffness (kp) for MIT mode (default: 0.0)
            - damping: Damping (kd) for MIT mode (default: 0.0)
            - feedforward_torque: Feedforward torque for MIT mode (default: 0.0)
            - velocity_limit: Velocity limit (rad/s, 0-100) for FORCE_POS mode
            - current_limit: Torque current limit normalized (0.0-1.0) for FORCE_POS mode
            - frequency: Command frequency in Hz (default: 100.0)
            - channel: CAN channel (default: can0)
            - bustype: CAN bus type (default: socketcan)
            - bitrate: CAN bitrate in bits per second (default: 1000000)

    Examples:
        ```bash
        # MIT mode (default)
        damiao send-cmd --id 1 --mode MIT --position 1.5 --velocity 0.0 --stiffness 3.0 --damping 0.5

        # POS_VEL mode
        damiao send-cmd --id 1 --mode POS_VEL --position 1.5 --velocity 2.0

        # VEL mode
        damiao send-cmd --id 1 --mode VEL --velocity 3.0

        # FORCE_POS mode
        damiao send-cmd --id 1 --mode FORCE_POS --position 1.5 --velocity-limit 50.0 --current-limit 0.8
        ```
    """
    print("=" * 60)
    print("DaMiao Motor - Send Command")
    print("=" * 60)
    print(f"CAN channel: {args.channel}")
    print(f"Motor ID: 0x{args.motor_id:02X} ({args.motor_id})")
    print(f"Control Mode: {args.mode}")
    if args.mode == "MIT":
        print(f"  Position: {args.position:.6f} rad")
        print(f"  Velocity: {args.velocity:.6f} rad/s")
        print(f"  Stiffness (kp): {args.stiffness:.6f}")
        print(f"  Damping (kd): {args.damping:.6f}")
        print(f"  Feedforward Torque: {args.feedforward_torque:.6f} Nm")
    elif args.mode == "POS_VEL":
        print(f"  Position: {args.position:.6f} rad")
        print(f"  Velocity: {args.velocity:.6f} rad/s")
    elif args.mode == "VEL":
        print(f"  Velocity: {args.velocity:.6f} rad/s")
    elif args.mode == "FORCE_POS":
        print(f"  Position: {args.position:.6f} rad")
        print(f"  Velocity Limit: {args.velocity_limit:.6f} rad/s")
        print(f"  Current Limit: {args.current_limit:.6f}")
    print(f"Frequency: {args.frequency} Hz")
    print("=" * 60)
    print()

    # Check and bring up CAN interface if needed
    if args.bustype == "socketcan":
        if not check_and_bring_up_can_interface(args.channel, bitrate=args.bitrate):
            print(f"⚠ Warning: Could not verify {args.channel} is ready. Continuing anyway...")

    controller = DaMiaoController(channel=args.channel, bustype=args.bustype)

    try:
        motor = controller.add_motor(motor_id=args.motor_id, feedback_id=0x00, motor_type=args.motor_type)

        # Ensure control mode (register 10) matches the desired mode
        motor.ensure_control_mode(args.mode)

        # Determine CAN ID based on mode
        can_id_map = {
            "MIT": args.motor_id,
            "POS_VEL": 0x100 + args.motor_id,
            "VEL": 0x200 + args.motor_id,
            "FORCE_POS": 0x300 + args.motor_id,
        }
        can_id = can_id_map.get(args.mode, args.motor_id)

        print(f"Sending command continuously (press Ctrl+C to stop)...")
        print(f"  CAN ID: 0x{can_id:03X}")
        print(f"  Mode: {args.mode}")
        print(f"  Frequency: {args.frequency} Hz")
        print()

        interval = 1.0 / args.frequency if args.frequency > 0 else 0.01

        try:
            while True:
                if args.mode == "MIT":
                    motor.send_cmd(
                        target_position=args.position,
                        target_velocity=args.velocity,
                        stiffness=args.stiffness,
                        damping=args.damping,
                        feedforward_torque=args.feedforward_torque,
                        control_mode="MIT"
                    )
                elif args.mode == "POS_VEL":
                    motor.send_cmd(
                        target_position=args.position,
                        target_velocity=args.velocity,
                        control_mode="POS_VEL"
                    )
                elif args.mode == "VEL":
                    motor.send_cmd(
                        target_velocity=args.velocity,
                        control_mode="VEL"
                    )
                elif args.mode == "FORCE_POS":
                    motor.send_cmd(
                        target_position=args.position,
                        velocity_limit=args.velocity_limit,
                        current_limit=args.current_limit,
                        control_mode="FORCE_POS"
                    )

                controller.poll_feedback()

                if motor.state:
                    print_motor_state(motor.state)

                time.sleep(interval)
        except KeyboardInterrupt:
            print("\n\nStopped by user.")

    except Exception as e:
        print(f"Error: {e}")
        raise
    finally:
        print("Shutting down controller...")
        controller.shutdown()

damiao_motor.cli.cmd_set_zero

cmd_set_zero(args) -> None

Handle 'set-zero-command' subcommand.

Sends a zero command to a motor continuously. Loops until interrupted with Ctrl+C.

Parameters:

Name Type Description Default
args

Parsed command-line arguments containing: - motor_id: Motor ID to send zero command to (required) - frequency: Command frequency in Hz (default: 100.0) - channel: CAN channel (default: can0) - bustype: CAN bus type (default: socketcan) - bitrate: CAN bitrate in bits per second (default: 1000000)

required

Examples:

# Send zero command continuously
damiao set-zero-command --id 1

# With custom frequency
damiao set-zero-command --id 1 --frequency 50.0
Source code in damiao_motor/cli.py
def cmd_set_zero(args) -> None:
    """
    Handle 'set-zero-command' subcommand.

    Sends a zero command to a motor continuously.
    Loops until interrupted with Ctrl+C.

    Args:
        args: Parsed command-line arguments containing:
            - motor_id: Motor ID to send zero command to (required)
            - frequency: Command frequency in Hz (default: 100.0)
            - channel: CAN channel (default: can0)
            - bustype: CAN bus type (default: socketcan)
            - bitrate: CAN bitrate in bits per second (default: 1000000)

    Examples:
        ```bash
        # Send zero command continuously
        damiao set-zero-command --id 1

        # With custom frequency
        damiao set-zero-command --id 1 --frequency 50.0
        ```
    """
    print("=" * 60)
    print("DaMiao Motor - Set Zero Command")
    print("=" * 60)
    print(f"CAN channel: {args.channel}")
    print(f"Motor ID: 0x{args.motor_id:02X} ({args.motor_id})")
    print("=" * 60)
    print()

    # Check and bring up CAN interface if needed
    if args.bustype == "socketcan":
        if not check_and_bring_up_can_interface(args.channel, bitrate=args.bitrate):
            print(f"⚠ Warning: Could not verify {args.channel} is ready. Continuing anyway...")

    controller = DaMiaoController(channel=args.channel, bustype=args.bustype)

    try:
        motor = controller.add_motor(motor_id=args.motor_id, feedback_id=0x00, motor_type=args.motor_type)

        # Ensure control mode is set to MIT (register 10 = 1) for zero command
        try:
            motor.ensure_control_mode("MIT")
        except Exception as e:
            print(f"⚠ Warning: Could not verify/set control mode: {e}")
            print(f"  Continuing anyway, but motor may not respond correctly.")

        print(f"Sending zero command continuously (press Ctrl+C to stop)...")
        print(f"  Command: pos=0, vel=0, torq=0, kp=0, kd=0")
        print(f"  Frequency: {args.frequency} Hz")
        print()

        interval = 1.0 / args.frequency if args.frequency > 0 else 0.01

        try:
            while True:
                motor.set_zero_command()
                controller.poll_feedback()

                if motor.state:
                    print_motor_state(motor.state)

                time.sleep(interval)
        except KeyboardInterrupt:
            print("\n\nStopped by user.")

    except Exception as e:
        print(f"Error: {e}")
        raise
    finally:
        print("Shutting down controller...")
        controller.shutdown()

damiao_motor.cli.cmd_set_zero_position

cmd_set_zero_position(args) -> None

Handle 'set-zero-position' subcommand.

Sets the current output shaft position to zero (save position zero).

Parameters:

Name Type Description Default
args

Parsed command-line arguments containing: - motor_id: Motor ID (required) - channel: CAN channel (default: can0) - bustype: CAN bus type (default: socketcan) - bitrate: CAN bitrate in bits per second (default: 1000000)

required

Examples:

# Set current position to zero
damiao set-zero-position --id 1
Source code in damiao_motor/cli.py
def cmd_set_zero_position(args) -> None:
    """
    Handle 'set-zero-position' subcommand.

    Sets the current output shaft position to zero (save position zero).

    Args:
        args: Parsed command-line arguments containing:
            - motor_id: Motor ID (required)
            - channel: CAN channel (default: can0)
            - bustype: CAN bus type (default: socketcan)
            - bitrate: CAN bitrate in bits per second (default: 1000000)

    Examples:
        ```bash
        # Set current position to zero
        damiao set-zero-position --id 1
        ```
    """
    print("=" * 60)
    print("DaMiao Motor - Set Zero Position")
    print("=" * 60)
    print(f"CAN channel: {args.channel}")
    print(f"Motor ID: 0x{args.motor_id:02X} ({args.motor_id})")
    print("=" * 60)
    print()

    # Check and bring up CAN interface if needed
    if args.bustype == "socketcan":
        if not check_and_bring_up_can_interface(args.channel, bitrate=args.bitrate):
            print(f"⚠ Warning: Could not verify {args.channel} is ready. Continuing anyway...")

    controller = DaMiaoController(channel=args.channel, bustype=args.bustype)

    try:
        motor = controller.add_motor(motor_id=args.motor_id, feedback_id=0x00, motor_type=args.motor_type)

        print(f"Setting current position to zero...")
        motor.set_zero_position()
        print(f"✓ Position zero set")

    except Exception as e:
        print(f"Error: {e}")
        raise
    finally:
        controller.shutdown()

damiao_motor.cli.cmd_set_can_timeout

cmd_set_can_timeout(args) -> None

Handle 'set-can-timeout' subcommand.

Sets the CAN timeout alarm time (register 9) in milliseconds.

Parameters:

Name Type Description Default
args

Parsed command-line arguments containing: - motor_id: Motor ID (required) - timeout_ms: Timeout in milliseconds (required) - channel: CAN channel (default: can0) - bustype: CAN bus type (default: socketcan) - bitrate: CAN bitrate in bits per second (default: 1000000)

required
Note

Register 9 stores timeout in units of 50 microseconds: 1 register unit = 50 microseconds.

The timeout is internally converted from milliseconds to register units using: register_value = timeout_ms × 20

Examples: - 1000 ms = 20,000 register units - 50 ms = 1,000 register units

The value is stored to flash memory after setting.

Examples:

# Set CAN timeout to 1000 ms
damiao set-can-timeout --id 1 --timeout 1000
Source code in damiao_motor/cli.py
def cmd_set_can_timeout(args) -> None:
    """
    Handle 'set-can-timeout' subcommand.

    Sets the CAN timeout alarm time (register 9) in milliseconds.

    Args:
        args: Parsed command-line arguments containing:
            - motor_id: Motor ID (required)
            - timeout_ms: Timeout in milliseconds (required)
            - channel: CAN channel (default: can0)
            - bustype: CAN bus type (default: socketcan)
            - bitrate: CAN bitrate in bits per second (default: 1000000)

    Note:
        Register 9 stores timeout in units of 50 microseconds: **1 register unit = 50 microseconds**.

        The timeout is internally converted from milliseconds to register units using:
        register_value = timeout_ms × 20

        Examples:
        - 1000 ms = 20,000 register units
        - 50 ms = 1,000 register units

        The value is stored to flash memory after setting.

    Examples:
        ```bash
        # Set CAN timeout to 1000 ms
        damiao set-can-timeout --id 1 --timeout 1000
        ```
    """
    print("=" * 60)
    print("DaMiao Motor - Set CAN Timeout")
    print("=" * 60)
    print(f"CAN channel: {args.channel}")
    print(f"Motor ID: 0x{args.motor_id:02X} ({args.motor_id})")
    print(f"Timeout: {args.timeout_ms} ms")
    print("=" * 60)
    print()

    # Check and bring up CAN interface if needed
    if args.bustype == "socketcan":
        if not check_and_bring_up_can_interface(args.channel, bitrate=args.bitrate):
            print(f"⚠ Warning: Could not verify {args.channel} is ready. Continuing anyway...")

    controller = DaMiaoController(channel=args.channel, bustype=args.bustype)

    try:
        motor = controller.add_motor(motor_id=args.motor_id, feedback_id=0x00, motor_type=args.motor_type)

        print(f"Setting CAN timeout to {args.timeout_ms} ms (register 9)...")
        motor.set_can_timeout(args.timeout_ms)

        # Store parameters to flash
        print("Storing parameters to flash...")
        try:
            motor.store_parameters()
            print("✓ Parameters stored to flash")
        except Exception as e:
            print(f"⚠ Warning: Could not store parameters: {e}")

        print()
        print(f"✓ CAN timeout set to {args.timeout_ms} ms")

    except Exception as e:
        print(f"Error: {e}")
        raise
    finally:
        controller.shutdown()

damiao_motor.cli.cmd_set_motor_id

cmd_set_motor_id(args) -> None

Handle 'set-motor-id' subcommand.

Changes the motor's receive ID (ESC_ID, register 8). This is the ID used to send commands to the motor.

Parameters:

Name Type Description Default
args

Parsed command-line arguments containing: - current: Current motor ID (to connect to the motor) (required) - target: Target motor ID (new receive ID) (required) - channel: CAN channel (default: can0) - bustype: CAN bus type (default: socketcan) - bitrate: CAN bitrate in bits per second (default: 1000000)

required
Note

After changing the motor ID, you will need to use the new ID to communicate with the motor. The value is stored to flash memory after setting.

Examples:

# Change motor ID from 1 to 2
damiao set-motor-id --current 1 --target 2
Source code in damiao_motor/cli.py
def cmd_set_motor_id(args) -> None:
    """
    Handle 'set-motor-id' subcommand.

    Changes the motor's receive ID (ESC_ID, register 8). This is the ID used to send commands to the motor.

    Args:
        args: Parsed command-line arguments containing:
            - current: Current motor ID (to connect to the motor) (required)
            - target: Target motor ID (new receive ID) (required)
            - channel: CAN channel (default: can0)
            - bustype: CAN bus type (default: socketcan)
            - bitrate: CAN bitrate in bits per second (default: 1000000)

    Note:
        After changing the motor ID, you will need to use the new ID to communicate with the motor.
        The value is stored to flash memory after setting.

    Examples:
        ```bash
        # Change motor ID from 1 to 2
        damiao set-motor-id --current 1 --target 2
        ```
    """
    print("=" * 60)
    print("DaMiao Motor - Set Motor ID (Receive ID)")
    print("=" * 60)
    print(f"CAN channel: {args.channel}")
    print(f"Current Motor ID: 0x{args.current:02X} ({args.current})")
    print(f"Target Motor ID: 0x{args.target:02X} ({args.target})")
    print("=" * 60)
    print()

    if args.current == args.target:
        print("Current and target IDs are the same. No change needed.")
        return

    # Check and bring up CAN interface if needed
    if args.bustype == "socketcan":
        if not check_and_bring_up_can_interface(args.channel, bitrate=args.bitrate):
            print(f"⚠ Warning: Could not verify {args.channel} is ready. Continuing anyway...")

    controller = DaMiaoController(channel=args.channel, bustype=args.bustype)

    try:
        # Use current ID to connect
        motor = controller.add_motor(motor_id=args.current, feedback_id=0x00, motor_type=args.motor_type)

        print(f"Reading current register values...")
        time.sleep(0.1)
        controller.poll_feedback()

        # Read current receive ID (register 8)
        try:
            current_receive_id = motor.get_register(8, timeout=1.0)
            print(f"Current Receive ID (register 8): {int(current_receive_id)} (0x{int(current_receive_id):02X})")
        except Exception as e:
            print(f"⚠ Warning: Could not read register 8: {e}")
            print("  Proceeding with write anyway...")

        print(f"Writing new Receive ID (register 8) = {args.target} (0x{args.target:02X})...")
        motor.write_register(8, args.target)

        # Store parameters to flash
        print("Storing parameters to flash...")
        try:
            motor.store_parameters()
            print("✓ Parameters stored to flash")
        except Exception as e:
            print(f"⚠ Warning: Could not store parameters: {e}")

        print()
        print(f"✓ Motor ID changed from 0x{args.current:02X} to 0x{args.target:02X}")
        print(f"  Note: You may need to reconnect using the new ID: 0x{args.target:02X}")

    except Exception as e:
        print(f"Error: {e}")
        raise
    finally:
        controller.shutdown()

damiao_motor.cli.cmd_set_feedback_id

cmd_set_feedback_id(args) -> None

Handle 'set-feedback-id' subcommand.

Changes the motor's feedback ID (MST_ID, register 7). This is the ID used to identify feedback messages from the motor.

Parameters:

Name Type Description Default
args

Parsed command-line arguments containing: - current: Current motor ID (to connect to the motor) (required) - target: Target feedback ID (new MST_ID) (required) - channel: CAN channel (default: can0) - bustype: CAN bus type (default: socketcan) - bitrate: CAN bitrate in bits per second (default: 1000000)

required
Note

The motor will now respond with feedback using the new feedback ID. The value is stored to flash memory after setting.

Examples:

# Change feedback ID to 3 (using motor ID 1 to connect)
damiao set-feedback-id --current 1 --target 3
Source code in damiao_motor/cli.py
def cmd_set_feedback_id(args) -> None:
    """
    Handle 'set-feedback-id' subcommand.

    Changes the motor's feedback ID (MST_ID, register 7). This is the ID used to identify feedback messages from the motor.

    Args:
        args: Parsed command-line arguments containing:
            - current: Current motor ID (to connect to the motor) (required)
            - target: Target feedback ID (new MST_ID) (required)
            - channel: CAN channel (default: can0)
            - bustype: CAN bus type (default: socketcan)
            - bitrate: CAN bitrate in bits per second (default: 1000000)

    Note:
        The motor will now respond with feedback using the new feedback ID.
        The value is stored to flash memory after setting.

    Examples:
        ```bash
        # Change feedback ID to 3 (using motor ID 1 to connect)
        damiao set-feedback-id --current 1 --target 3
        ```
    """
    print("=" * 60)
    print("DaMiao Motor - Set Feedback ID (MST_ID)")
    print("=" * 60)
    print(f"CAN channel: {args.channel}")
    print(f"Current Motor ID: 0x{args.current:02X} ({args.current})")
    print(f"Target Feedback ID: 0x{args.target:02X} ({args.target})")
    print("=" * 60)
    print()

    # Check and bring up CAN interface if needed
    if args.bustype == "socketcan":
        if not check_and_bring_up_can_interface(args.channel, bitrate=args.bitrate):
            print(f"⚠ Warning: Could not verify {args.channel} is ready. Continuing anyway...")

    controller = DaMiaoController(channel=args.channel, bustype=args.bustype)

    try:
        # Use current motor ID to connect
        motor = controller.add_motor(motor_id=args.current, feedback_id=0x00, motor_type=args.motor_type)

        print(f"Reading current register values...")
        time.sleep(0.1)
        controller.poll_feedback()

        # Read current feedback ID (register 7)
        try:
            current_feedback_id = motor.get_register(7, timeout=1.0)
            print(f"Current Feedback ID (register 7): {int(current_feedback_id)} (0x{int(current_feedback_id):02X})")
        except Exception as e:
            print(f"⚠ Warning: Could not read register 7: {e}")
            print("  Proceeding with write anyway...")

        print(f"Writing new Feedback ID (register 7) = {args.target} (0x{args.target:02X})...")
        motor.write_register(7, args.target)

        # Store parameters to flash
        print("Storing parameters to flash...")
        try:
            motor.store_parameters()
            print("✓ Parameters stored to flash")
        except Exception as e:
            print(f"⚠ Warning: Could not store parameters: {e}")

        print()
        print(f"✓ Feedback ID changed to 0x{args.target:02X}")
        print(f"  Note: Motor will now respond with feedback ID 0x{args.target:02X}")

    except Exception as e:
        print(f"Error: {e}")
        raise
    finally:
        controller.shutdown()

Utility Functions

damiao_motor.cli.scan_motors

scan_motors(channel: str = 'can0', bustype: str = 'socketcan', motor_ids: list[int] | None = None, duration_s: float = 3.0, bitrate: int = 1000000, debug: bool = False, *, motor_type: str) -> Set[int]

Scan for connected motors by sending zero commands and listening for feedback.

Parameters:

Name Type Description Default
channel str

CAN channel (e.g., "can0")

'can0'
bustype str

CAN bus type (e.g., "socketcan")

'socketcan'
motor_ids list[int] | None

List of motor IDs to test. If None, tests IDs 0x01-0x10.

None
duration_s float

How long to listen for responses (seconds)

3.0
motor_type str

Motor type for P/V/T presets (e.g. 4340, 4310, 3507). Required.

required

Returns:

Type Description
Set[int]

Set of motor IDs that responded with feedback.

Source code in damiao_motor/cli.py
def scan_motors(
    channel: str = "can0",
    bustype: str = "socketcan",
    motor_ids: list[int] | None = None,
    duration_s: float = 3.0,
    bitrate: int = 1000000,
    debug: bool = False,
    *,
    motor_type: str,
) -> Set[int]:
    """
    Scan for connected motors by sending zero commands and listening for feedback.

    Args:
        channel: CAN channel (e.g., "can0")
        bustype: CAN bus type (e.g., "socketcan")
        motor_ids: List of motor IDs to test. If None, tests IDs 0x01-0x10.
        duration_s: How long to listen for responses (seconds)
        motor_type: Motor type for P/V/T presets (e.g. 4340, 4310, 3507). Required.

    Returns:
        Set of motor IDs that responded with feedback.
    """
    if motor_ids is None:
        motor_ids = list(range(0x01, 0x11))  # Test IDs 1-16

    # Open scan status box (80 chars wide, 78 interior)
    print(f"{BOX_CORNER_TL}{BOX_HORIZONTAL * 78}{BOX_CORNER_TR}")

    # Check and bring up CAN interface if needed (only for socketcan)
    if bustype == "socketcan":
        line_text = f" Checking CAN interface {channel}..."
        print(f"{BOX_VERTICAL}{pad_with_ansi(line_text, 78)}{BOX_VERTICAL}")
        if not check_and_bring_up_can_interface(channel, bitrate=bitrate):
            warning_text = f" {YELLOW}⚠ Warning: Could not verify {channel} is ready. Continuing anyway...{RESET}"
            print(f"{BOX_VERTICAL}{pad_with_ansi(warning_text, 78)}{BOX_VERTICAL}")
        else:
            # Verify interface is actually up and working
            verify_result = subprocess.run(
                ["ip", "link", "show", channel],
                capture_output=True,
                text=True,
                check=False,
            )
            if verify_result.returncode == 0 and "state UP" in verify_result.stdout:
                ready_text = f" {GREEN}✓ CAN interface {channel} is ready{RESET}"
                print(f"{BOX_VERTICAL}{pad_with_ansi(ready_text, 78)}{BOX_VERTICAL}")
            else:
                warning_text = f" {YELLOW}⚠ Warning: {channel} may not be properly configured{RESET}"
                print(f"{BOX_VERTICAL}{pad_with_ansi(warning_text, 78)}{BOX_VERTICAL}")

    controller = DaMiaoController(channel=channel, bustype=bustype)

    # Flush any pending messages from the bus
    line_text = f" Flushing CAN bus buffer..."
    print(f"{BOX_VERTICAL}{pad_with_ansi(line_text, 78)}{BOX_VERTICAL}")
    flushed_count = controller.flush_bus()
    if flushed_count > 0:
        flushed_text = f"   {GREEN}Flushed {flushed_count} pending message(s) from bus{RESET}"
        print(f"{BOX_VERTICAL}{pad_with_ansi(flushed_text, 78)}{BOX_VERTICAL}")
    else:
        line_text = f"   Bus buffer is clean"
        print(f"{BOX_VERTICAL}{pad_with_ansi(line_text, 78)}{BOX_VERTICAL}")

    motors: dict[int, DaMiaoMotor] = {}

    # Create motor instances for all IDs we want to test
    for motor_id in motor_ids:
        try:
            motor = controller.add_motor(motor_id=motor_id, feedback_id=0x00, motor_type=motor_type)
            motors[motor_id] = motor
        except ValueError:
            # Motor already exists, skip
            pass

    # Send zero command to all motors
    line_text = f" Sending zero command to {len(motors)} potential motor IDs..."
    print(f"{BOX_VERTICAL}{pad_with_ansi(line_text, 78)}{BOX_VERTICAL}")
    try:
        for motor in motors.values():
            motor.send_cmd(target_position=0.0, target_velocity=0.0, stiffness=0.0, damping=0.0, feedforward_torque=0.0)
            if debug:
                # Print sent command in debug mode
                cmd_data = motor.encode_cmd_msg(0.0, 0.0, 0.0, 0.0, 0.0)
                data_hex = " ".join(f"{b:02X}" for b in cmd_data)
                sent_text = f"   [SENT] 0x{motor.motor_id:03X} [{data_hex}]"
                print(f"{BOX_VERTICAL}{pad_with_ansi(sent_text, 78)}{BOX_VERTICAL}")
    except Exception as e:
        error_str = str(e)
        if "Error Code 80" in error_str or "No buffer space available" in error_str or "[Errno 80]" in error_str:
            error_lines = [
                "Original error: " + str(e),
                "",
                "This error typically indicates:",
                "  • No CAN device (motor) is connected to the bus",
                "  • Motor(s) are not powered on",
                "  • CAN interface hardware issue",
                "",
                "Please check:",
                "  1. Motor(s) are properly connected to the CAN bus",
                "  2. Motor(s) are powered on",
                "  3. CAN interface hardware is working correctly",
                "  4. CAN bus termination resistors (120Ω) are installed at both ends",
            ]
            print_error_box("[ERROR CODE 80] No buffer space available when sending commands", error_lines, width=70)
            # Clean up and exit gracefully
            try:
                controller.bus.shutdown()
            except:
                pass
            sys.exit(1)
        else:
            raise

    # Listen for feedback
    line_text = f" Listening for responses for {duration_s} seconds..."
    print(f"{BOX_VERTICAL}{pad_with_ansi(line_text, 78)}{BOX_VERTICAL}")
    start_time = time.perf_counter()
    responded_ids: Set[int] = set()
    debug_messages = []  # Collect debug messages if debug mode is enabled
    # Track seen motor IDs and arbitration IDs for conflict detection
    seen_motor_ids: Set[int] = set()  # Track decoded motor IDs (logical_id)
    seen_arbitration_ids: Set[int] = set()  # Track arbitration IDs
    # Collect conflicts to group them at the end
    conflicted_motor_ids: Set[int] = set()  # Motor IDs that appeared multiple times
    conflicted_arbitration_ids: Set[int] = set()  # Arbitration IDs that appeared multiple times
    # Collect motor register information for table display
    motor_registers: Dict[int, Dict[int, float | int]] = {}  # motor_id -> {rid -> value}

    while time.perf_counter() - start_time < duration_s:
        # Debug mode: collect and print raw messages immediately
        if debug:
            # Read and collect raw messages, then process normally
            while True:
                msg = controller.bus.recv(timeout=0)
                if msg is None:
                    break
                data_hex = " ".join(f"{b:02X}" for b in msg.data)
                debug_msg = f"  0x{msg.arbitration_id:03X} [{data_hex}]"
                debug_messages.append(debug_msg)
                # Print immediately in debug mode
                print(debug_msg)
                # Process the message manually for debug mode
                if len(msg.data) == 8:
                    logical_id = msg.data[0] & 0x0F
                    arb_id = msg.arbitration_id

                    # Check for motor ID conflict (same decoded motor ID seen twice)
                    if logical_id in seen_motor_ids:
                        conflicted_motor_ids.add(logical_id)

                    # Check for arbitration ID conflict (same arbitration ID seen twice)
                    if arb_id in seen_arbitration_ids:
                        conflicted_arbitration_ids.add(arb_id)

                    seen_motor_ids.add(logical_id)
                    seen_arbitration_ids.add(arb_id)

                    motor = controller._motors_by_feedback.get(logical_id)
                    if motor is not None:
                        motor.decode_sensor_feedback(bytes(msg.data), arbitration_id=arb_id)
        else:
            # Normal mode: read messages, check conflicts, then process
            while True:
                msg = controller.bus.recv(timeout=0)
                if msg is None:
                    break

                if len(msg.data) == 8:
                    logical_id = msg.data[0] & 0x0F
                    arb_id = msg.arbitration_id

                    # Check for motor ID conflict (same decoded motor ID seen twice)
                    if logical_id in seen_motor_ids:
                        conflicted_motor_ids.add(logical_id)

                    # Check for arbitration ID conflict (same arbitration ID seen twice)
                    if arb_id in seen_arbitration_ids:
                        conflicted_arbitration_ids.add(arb_id)

                    seen_motor_ids.add(logical_id)
                    seen_arbitration_ids.add(arb_id)

                    # Process through controller
                    motor = controller._motors_by_feedback.get(logical_id)
                    if motor is not None:
                        motor.decode_sensor_feedback(bytes(msg.data), arbitration_id=arb_id)

        # Check which motors have received feedback
        for motor_id, motor in motors.items():
            if motor.state and motor.state.get("can_id") is not None:
                # Print once per motor when first detected
                if motor_id not in responded_ids:
                    state_name = motor.state.get("status", "UNKNOWN")
                    pos = motor.state.get("pos", 0.0)
                    arb_id = motor.state.get("arbitration_id")
                    if arb_id is not None:
                        motor_text = f"   {GREEN}✓ Motor ID 0x{motor_id:02X}{RESET} responded (arb_id: 0x{arb_id:03X}, state: {state_name}, pos: {pos:.3f})"
                        print(f"{BOX_VERTICAL}{pad_with_ansi(motor_text, 78)}{BOX_VERTICAL}")
                    else:
                        motor_text = f"   {GREEN}✓ Motor ID 0x{motor_id:02X}{RESET} responded (state: {state_name}, pos: {pos:.3f})"
                        print(f"{BOX_VERTICAL}{pad_with_ansi(motor_text, 78)}{BOX_VERTICAL}")

                responded_ids.add(motor_id)

        time.sleep(0.01)

    # Print conflicts (grouped)
    if conflicted_motor_ids:
        error_lines = [
            "Multiple motors responded with the same motor ID.",
            "This indicates multiple motors are configured with the same motor ID.",
            f"Conflicted Motor IDs: {', '.join(f'0x{mid:02X}' for mid in sorted(conflicted_motor_ids))}"
        ]
        print_error_box("[ERROR] Motor ID Conflicts Detected", error_lines)

    if conflicted_arbitration_ids:
        warning_lines = [
            "Same arbitration ID seen multiple times.",
            "This may indicate a CAN bus configuration issue.",
            f"Conflicted Arbitration IDs: {', '.join(f'0x{aid:03X}' for aid in sorted(conflicted_arbitration_ids))}"
        ]
        print_warning_box("[WARNING] Arbitration ID Conflicts Detected", warning_lines)

    # Close the scan status box
    print(f"{BOX_CORNER_BL}{BOX_HORIZONTAL * 78}{BOX_CORNER_BR}")

    # Read all registers from detected motors if no motor ID conflicts
    if not conflicted_motor_ids and responded_ids:
        print("Reading register parameters from detected motors...")
        for motor_id in sorted(responded_ids):
            motor = motors.get(motor_id)
            if motor is not None:
                try:
                    registers = motor.read_all_registers(timeout=0.05)
                    motor_registers[motor_id] = registers
                except Exception as e:
                    print(f"  {YELLOW}⚠ Failed to read registers from motor 0x{motor_id:02X}: {e}{RESET}")
        print()

    # Print motor register table if no motor ID conflicts
    if not conflicted_motor_ids and motor_registers:
        # Start register table box
        print()
        top_border = f"{BOX_CORNER_TL}{BOX_HORIZONTAL * 78}{BOX_CORNER_TR}"
        print(top_border)
        # Header line
        header_text = f" {GREEN}Detected Motors - Register Parameters{RESET}"
        print(f"{BOX_VERTICAL}{pad_with_ansi(header_text, 78)}{BOX_VERTICAL}")

        # Group registers by motor
        for motor_id in sorted(motor_registers.keys()):
            registers = motor_registers[motor_id]
            # Separator line before motor section
            print(f"{BOX_JOIN_LEFT}{BOX_HORIZONTAL * 78}{BOX_JOIN_RIGHT}")
            # Motor ID header - use pad_with_ansi to account for color codes
            motor_id_text = f" {GREEN}Motor ID: 0x{motor_id:02X} ({motor_id}){RESET}"
            print(f"{BOX_VERTICAL}{pad_with_ansi(motor_id_text, 78)}{BOX_VERTICAL}")
            # Separator line
            print(f"{BOX_JOIN_LEFT}{BOX_HORIZONTAL * 78}{BOX_JOIN_RIGHT}")
            # Table header - adjust column widths to fit within 78 chars
            # Format: " RID(4) Var(10) Desc(32) Value(12) Type(8) Access(6)" = 78 total
            # Calculation: 1+4+1+10+1+32+1+12+1+8+1+6 = 78
            header_content = f" {'RID':<4} {'Variable':<10} {'Description':<32} {'Value':<12} {'Type':<8} {'Access':<6}"
            print(f"{BOX_VERTICAL}{pad_with_ansi(header_content, 78)}{BOX_VERTICAL}")
            # Header separator
            print(f"{BOX_JOIN_LEFT}{BOX_HORIZONTAL * 78}{BOX_JOIN_RIGHT}")

            for rid in sorted(registers.keys()):
                if rid not in REGISTER_TABLE:
                    continue

                reg_info = REGISTER_TABLE[rid]
                value = registers[rid]

                # Format value based on type
                if isinstance(value, str) and value.startswith("ERROR"):
                    value_str = value
                elif reg_info.data_type == "float":
                    value_str = f"{float(value):.2f}"
                else:
                    value_str = str(int(value))

                # Truncate long descriptions to fit (32 chars for desc column)
                desc = reg_info.description[:30] + ".." if len(reg_info.description) > 32 else reg_info.description

                # Format table row - match header column widths
                row_content = f" {rid:<4} {reg_info.variable:<10} {desc:<32} {value_str:<12} {reg_info.data_type:<8} {reg_info.access:<6}"
                print(f"{BOX_VERTICAL}{pad_with_ansi(row_content, 78)}{BOX_VERTICAL}")

        # Close the box
        print(f"{BOX_CORNER_BL}{BOX_HORIZONTAL * 78}{BOX_CORNER_BR}")

    # Print debug summary if messages were collected
    if debug and debug_messages:
        print()
        print_section_header(f"DEBUG: Total {len(debug_messages)} raw CAN messages received", width=80)
        print(f"{BOX_CORNER_BL}{BOX_HORIZONTAL * 78}{BOX_CORNER_BR}")

    # Cleanup
    try:
        controller.bus.shutdown()
    except:
        pass

    return responded_ids