Skip to content

Engineering & Code

Controlling APC PDUs with a Nautobot Job

10 min read Nautobot · Python · Network Automation

Introduction

Every lab and datacenter eventually runs into the same small problem. A device locks up, or you need to power-cycle a stuck appliance, and the only way to do it is to walk to the rack or hunt through a PDU’s web interface for the right outlet. Switched PDUs solve the physical side of this, but the mapping between “this device” and “that outlet on that PDU” usually lives in someone’s head or a spreadsheet.

Nautobot already models that mapping. If you cable a device’s power ports to PDU outlets in the source of truth, Nautobot knows exactly which outlets feed which device. I wanted to turn that knowledge into an action, so I wrote a Nautobot Job that powers APC PDU outlets on or off, or reads their status, for the devices you select.

How the job works

The flow is straightforward once you lean on the data Nautobot already has:

  1. You select one or more devices (not the PDUs themselves) in the job form, along with an action: STATUS, ON, or OFF.
  2. For each device, the job follows its power ports through the cabling to the connected PowerOutlet on the feeding PDU.
  3. The APC CLI outlet number is parsed from the outlet name.
  4. The job connects to each PDU over SSH and runs the matching APC command, then reads the outlet status back to confirm the result.

Because the cabling already records which outlet feeds which device, you never type an outlet number or an IP address. You pick a device and pick an action. A failing device does not stop the run either. If you select twenty devices and one PDU is unreachable, the job still processes the other nineteen and then fails the JobResult with a summary of what went wrong.

Mapping cables from device to PDU

The whole job rests on the power cabling you model in Nautobot. A device has power ports, each PDU has power outlets, and a cable connects the two. A dual-fed device with redundant power supplies looks like this:

  Device: server-01                         PDUs
  ┌─────────────────┐
  │  Power Supply 1 ├───── cable ─────► [ pdu-a : Power Outlet 17 ]
  │  Power Supply 2 ├───── cable ─────► [ pdu-b : Power Outlet 9  ]
  └─────────────────┘

The job walks each of the device’s power ports, follows the cable to its connected endpoint, and keeps the endpoints that belong to a PDU. Power feeds have no device attached, so they are skipped and reported as unconnected:

def resolve_pdu_outlets(device):
    """Resolve ``device``'s power cabling to PDU outlets."""
    pdu_outlets = {}
    unconnected = []
    for power_port in device.power_ports.all():
        endpoint = power_port.connected_endpoint
        pdu = getattr(endpoint, "device", None)
        if pdu is None:
            unconnected.append(power_port.name)
            continue
        pdu_outlets.setdefault(pdu, []).append(parse_outlet_number(endpoint.name))
    return (
        {pdu: sorted(ids) for pdu, ids in pdu_outlets.items()},
        unconnected,
    )

The result groups outlets by their feeding PDU, so the server-01 example above resolves to {pdu-a: [17], pdu-b: [9]}. The job then handles each PDU connection on its own.

The one assumption here is the outlet number. Nautobot names the outlet, and the APC CLI addresses it by an integer. The job bridges the two by reading the trailing integer of the outlet name, so Power Outlet 17 becomes 17:

_OUTLET_NUMBER_RE = re.compile(r"(\d+)\s*$")


def parse_outlet_number(outlet_name):
    """Return the trailing integer of ``outlet_name`` (the APC CLI outlet number)."""
    match = _OUTLET_NUMBER_RE.search(outlet_name or "")
    if match is None:
        raise ValueError(f"Outlet name {outlet_name!r} has no trailing outlet number.")
    return int(match.group(1))

Name your outlets so the trailing number matches the physical outlet on the PDU and the mapping takes care of itself.

The APC commands

APC’s Network Management Card exposes a small CLI over SSH. The job uses three commands:

  • olStatus <ids> reads the current state of the listed outlets.
  • olOn <ids> powers the listed outlets on.
  • olOff <ids> powers the listed outlets off.

Outlet IDs are passed as a comma-separated list, so a device on outlets 5 and 6 of the same PDU becomes a single olOn 5,6. On success, the NMC echoes the result code E000. The job treats anything else as a failure, and it matches the code as a whole token so a value like E0001 does not pass:

APC_SUCCESS_CODE = "E000"
_SUCCESS_CODE_RE = re.compile(rf"\b{APC_SUCCESS_CODE}\b")


def check_success(output):
    """Return ``output`` if it contains the APC success code, else raise."""
    if not _SUCCESS_CODE_RE.search(output or ""):
        raise PduCommandError(f"APC command did not report {APC_SUCCESS_CODE} success:\n{output}")
    return output

For ON and OFF, the job sends the command, requires the E000 code, and then runs a follow-up olStatus on the same outlets to confirm they actually changed state. STATUS skips straight to the olStatus query. Reading the state back means the job reports what the PDU is actually doing, not just that it accepted the command.

The SSH session itself depends on your netmiko version. netmiko 4.7, released in May of this year, ships a dedicated apc_aos driver that knows the NMC’s apc> prompt and handles the login banner for you. If you are on 4.7 or newer, point ConnectHandler at device_type="apc_aos" and let the driver do the session setup.

On older netmiko, that driver does not exist, and the Nautobot worker image still tends to lag behind. The job falls back to the generic driver in that case. The generic driver does no prompt detection of its own, so the job replicates what apc_aos would do: it drains the login banner by reading until the apc> prompt, then anchors the base prompt on the > terminator before sending any command:

connection = ConnectHandler(
    device_type="generic",
    host=host,
    username=username,
    password=password,
    conn_timeout=CONN_TIMEOUT,
    banner_timeout=BANNER_TIMEOUT,
)
try:
    connection.read_until_pattern(pattern=r"apc>", read_timeout=READ_TIMEOUT)
    connection.set_base_prompt(pri_prompt_terminator=">", alt_prompt_terminator=">")
    ...
finally:
    connection.disconnect()

The timeouts are generous at 30 seconds. The APC NMC’s SSH stack negotiates slowly, around 8 seconds in my testing, which is close to netmiko’s 10-second default and causes intermittent failures without the extra headroom.

Knowing the PDU’s SSH credentials

Credentials never go in the job form. Each APC PDU is modeled as a device in Nautobot with a Secrets Group assigned to it, and the job pulls the username and password from that group at run time. When it resolves a device to its feeding PDU, it reads that PDU’s Secrets Group to get the login.

It tries the SSH access type first, since that is the intuitive home for SSH credentials, and falls back to Generic so existing groups keep working:

for access_type in (
    SecretsGroupAccessTypeChoices.TYPE_SSH,
    SecretsGroupAccessTypeChoices.TYPE_GENERIC,
):
    try:
        username = secrets_group.get_secret_value(
            access_type=access_type,
            secret_type=SecretsGroupSecretTypeChoices.TYPE_USERNAME,
        )
        password = secrets_group.get_secret_value(
            access_type=access_type,
            secret_type=SecretsGroupSecretTypeChoices.TYPE_PASSWORD,
        )
    except ObjectDoesNotExist:
        continue
    return username, password
raise ValueError(f"Secrets Group {secrets_group.name} has no SSH or Generic username/password secrets.")

The practical setup is one Secrets Group per credential, assigned to each APC PDU device. Operators run the job by picking a device and an action. The secrets stay in Nautobot’s secret store, and a PDU with no Secrets Group assigned fails fast with a clear message.

Job buttons for one-click power

The bulk job is good for batches, but the most common need is power-cycling one device you are already looking at. For that, the same logic is exposed as three JobButtonReceiver jobs: Status, On, and Off. Grouped under a single Job Button dropdown on the device detail page, they let you act on the device in front of you with one click, no form to fill out. They share all the connect-and-command logic with the bulk job through a mixin, so a button and a bulk run behave identically.

Power-off is the one action that can cause real damage. A misclick on the wrong device can take down a core switch, a storage controller, or anything else the rest of the lab depends on, so the OFF path needs a guardrail that refuses to power off protected gear before any SSH connection is even attempted.

What counts as protected should be flexible. Different shops draw the line in different places, so the guard should be able to match on an assigned tag, a device’s role, its tenant, or an individual device name. A tag like Power Protected lets you opt specific devices in or out with a single label, without reorganizing your data model. Grouping by role catches a whole class of gear in one rule. Grouping by tenant protects a whole customer’s footprint. A per-device entry covers the one box that does not fit any group.

Wherever the line is drawn, the check belongs in the server-side job logic so it applies to the button, the bulk job, and any API call alike. You can also hide the Off button on protected devices with conditional Jinja, but that is cosmetic polish on top of the real enforcement in the code path.

Putting it all together

Here is a trimmed sample Nautobot job that ties the pieces together: the cable-to-outlet mapping, the credential lookup, the APC commands, the destructive-poweroff guard, the bulk job, and the per-device buttons. It is condensed for readability, but it runs the same shape as the full version.

"""Nautobot Job that controls APC PDU outlets over SSH."""

import re

from django.core.exceptions import ObjectDoesNotExist
from nautobot.apps.jobs import (
    ChoiceVar,
    Job,
    JobButtonReceiver,
    MultiObjectVar,
    register_jobs,
)
from nautobot.dcim.models import Device
from nautobot.extras.choices import (
    SecretsGroupAccessTypeChoices,
    SecretsGroupSecretTypeChoices,
)
from netmiko import ConnectHandler
from netmiko import __version__ as netmiko_version

ACTION_STATUS, ACTION_ON, ACTION_OFF = "status", "on", "off"
ACTION_CHOICES = ((ACTION_STATUS, "STATUS"), (ACTION_ON, "ON"), (ACTION_OFF, "OFF"))
ACTION_COMMANDS = {ACTION_ON: "olOn", ACTION_OFF: "olOff"}
STATUS_COMMAND = "olStatus"

# Tags whose devices may never be powered off. Swap or extend this for a
# role/tenant/name match to fit how your shop draws the line.
PROTECTED_OFF_TAGS = {"Power Protected"}

APC_PROMPT_PATTERN = r"apc>"
APC_SUCCESS_CODE = "E000"
_SUCCESS_CODE_RE = re.compile(rf"\b{APC_SUCCESS_CODE}\b")
_OUTLET_NUMBER_RE = re.compile(r"(\d+)\s*$")
CONN_TIMEOUT = BANNER_TIMEOUT = READ_TIMEOUT = 30


def parse_outlet_number(name):
    """Return the trailing integer of an outlet name ("Power Outlet 17" -> 17)."""
    match = _OUTLET_NUMBER_RE.search(name or "")
    if match is None:
        raise ValueError(f"Outlet name {name!r} has no trailing outlet number.")
    return int(match.group(1))


def resolve_pdu_outlets(device):
    """Map a device's power cabling to ``{pdu: [outlet_ids]}`` plus unconnected ports."""
    pdu_outlets, unconnected = {}, []
    for power_port in device.power_ports.all():
        endpoint = power_port.connected_endpoint
        pdu = getattr(endpoint, "device", None)
        if pdu is None:
            unconnected.append(power_port.name)
            continue
        pdu_outlets.setdefault(pdu, []).append(parse_outlet_number(endpoint.name))
    return {pdu: sorted(ids) for pdu, ids in pdu_outlets.items()}, unconnected


def is_off_blocked(action, tag_names):
    """True if ``action`` would power OFF a device carrying a protected tag."""
    return action == ACTION_OFF and bool(PROTECTED_OFF_TAGS & set(tag_names))


class PduActionMixin:
    """Shared connect-and-command logic for the bulk job and the buttons."""

    def _process_device(self, device, action):
        tag_names = {tag.name for tag in device.tags.all()}
        if is_off_blocked(action, tag_names):
            raise RuntimeError(f"Refusing to power OFF {device.name}: a protected tag is set.")
        pdu_outlets, unconnected = resolve_pdu_outlets(device)
        for port_name in unconnected:
            self.logger.warning("Power port %s on %s is unconnected.", port_name, device.name)
        if not pdu_outlets:
            raise ValueError(f"No PDU connected to {device.name}")
        for pdu, outlet_ids in pdu_outlets.items():
            username, password = self._pdu_credentials(pdu)
            self._run_on_pdu(pdu, action, outlet_ids, username, password)

    def _pdu_credentials(self, pdu):
        """Read the PDU's assigned Secrets Group, SSH access type first."""
        group = pdu.secrets_group
        if group is None:
            raise ValueError(f"PDU {pdu.name} has no Secrets Group assigned.")
        for access in (SecretsGroupAccessTypeChoices.TYPE_SSH, SecretsGroupAccessTypeChoices.TYPE_GENERIC):
            try:
                username = group.get_secret_value(access, SecretsGroupSecretTypeChoices.TYPE_USERNAME)
                password = group.get_secret_value(access, SecretsGroupSecretTypeChoices.TYPE_PASSWORD)
            except ObjectDoesNotExist:
                continue
            return username, password
        raise ValueError(f"Secrets Group {group.name} has no SSH/Generic username/password.")

    def _connect(self, host, username, password):
        """Use apc_aos on netmiko 4.7+, else prepare the generic driver by hand."""
        if tuple(int(p) for p in netmiko_version.split(".")[:2]) >= (4, 7):
            return ConnectHandler(
                device_type="apc_aos", host=host, username=username, password=password,
                conn_timeout=CONN_TIMEOUT, banner_timeout=BANNER_TIMEOUT,
            )
        conn = ConnectHandler(
            device_type="generic", host=host, username=username, password=password,
            conn_timeout=CONN_TIMEOUT, banner_timeout=BANNER_TIMEOUT,
        )
        conn.read_until_pattern(pattern=APC_PROMPT_PATTERN, read_timeout=READ_TIMEOUT)
        conn.set_base_prompt(pri_prompt_terminator=">", alt_prompt_terminator=">")
        return conn

    def _run_on_pdu(self, pdu, action, outlet_ids, username, password):
        host = str(pdu.primary_ip.host) if pdu.primary_ip else pdu.name
        ids = ",".join(str(i) for i in outlet_ids)
        conn = self._connect(host, username, password)
        try:
            verb = ACTION_COMMANDS.get(action)
            if verb is not None:
                output = conn.send_command(f"{verb} {ids}", read_timeout=READ_TIMEOUT)
                if not _SUCCESS_CODE_RE.search(output or ""):
                    raise RuntimeError(f"{pdu.name}: command did not report {APC_SUCCESS_CODE}.")
            status = conn.send_command(f"{STATUS_COMMAND} {ids}", read_timeout=READ_TIMEOUT)
            self.logger.info("%s outlets %s:\n%s", pdu.name, ids, status)
        finally:
            conn.disconnect()

    def _run_button(self, device, action):
        try:
            self._process_device(device, action)
        except Exception as err:  # pylint: disable=broad-exception-caught
            self.logger.error("Device %s failed: %s", device.name, err)
            raise


class PduPowerControl(PduActionMixin, Job):
    """Power APC PDU outlets on/off or read status for the selected devices."""

    devices = MultiObjectVar(model=Device, description="Devices fed by APC PDU outlets.")
    action = ChoiceVar(choices=ACTION_CHOICES, description="Outlet action to perform.")

    class Meta:
        name = "PDU Power Control"
        has_sensitive_variables = False
        soft_time_limit = 1800
        time_limit = 1860

    def run(self, devices, action):  # pylint: disable=arguments-differ
        self.logger.info("Using netmiko %s.", netmiko_version)
        failed = []
        for device in devices:
            try:
                self._process_device(device, action)
            except Exception as err:  # pylint: disable=broad-exception-caught
                self.logger.error("Device %s failed: %s", device.name, err)
                failed.append(device.name)
        if failed:
            raise RuntimeError(f"{len(failed)} of {len(devices)} device(s) failed: {', '.join(failed)}.")
        return f"'{action}' completed for {len(devices)} device(s)."


class PduStatusButton(PduActionMixin, JobButtonReceiver):
    """JobButton: read outlet status for one device."""

    class Meta:
        name = "PDU Power Control: Status"

    def receive_job_button(self, obj):  # pylint: disable=arguments-differ
        self._run_button(obj, ACTION_STATUS)


class PduPowerOnButton(PduActionMixin, JobButtonReceiver):
    """JobButton: power ON one device's outlets."""

    class Meta:
        name = "PDU Power Control: On"

    def receive_job_button(self, obj):  # pylint: disable=arguments-differ
        self._run_button(obj, ACTION_ON)


class PduPowerOffButton(PduActionMixin, JobButtonReceiver):
    """JobButton: power OFF one device's outlets (tag-guarded)."""

    class Meta:
        name = "PDU Power Control: Off"

    def receive_job_button(self, obj):  # pylint: disable=arguments-differ
        self._run_button(obj, ACTION_OFF)


jobs = (PduPowerControl, PduStatusButton, PduPowerOnButton, PduPowerOffButton)
register_jobs(*jobs)

If you run Nautobot and switched APC PDUs, modeling your power cabling in the source of truth turns a tedious manual task into a two-click operation. The data is already there. This job just acts on it.