DEV Community

Cover image for Server Monitoring Without Prometheus - A Python + SSH Approach
Developer Service
Developer Service

Posted on • Originally published at developer-service.blog

Server Monitoring Without Prometheus - A Python + SSH Approach

You don’t need a monitoring stack that requires its own monitoring. If you’re running a handful of servers, Prometheus and Grafana are often more work than the problems they’re meant to solve.

If you can ssh user@server, you already have everything you need.

In this tutorial, we’ll build a lightweight, no-agent server monitoring tool using Python and SSH, designed for indie hackers, solo founders, and small teams. By the end, you’ll have a single script that checks CPU, memory, disk usage, and critical services across multiple servers, renders a clean terminal dashboard, and sends alerts when something actually breaks.

What we’ll build:

  • Monitor multiple servers over SSH (no agents, no daemons)
  • A real-time terminal dashboard with colors and status indicators
  • Alerts via ntfy.sh
  • Remote server updates with push notifications
  • One Python script you can read, trust, and extend

Why Not Just Use Prometheus?

Before we dive in, let’s get this out of the way: Prometheus and Grafana are excellent tools. At scale, they’re hard to beat, and they’re industry standard for a reason.

But if you’re running a small setup, say 1–20 servers, a few side projects, or a lean SaaS, they often introduce more complexity than actual value.

What starts as “just add monitoring” quickly turns into:

  • Heavy infrastructure: Prometheus, Grafana, node exporters, persistent storage, backups
  • Ongoing maintenance: Upgrades, config drift, broken dashboards at the worst time
  • A steep learning curve: PromQL, recording rules, and yet another YAML ecosystem

All of that just to answer a few basic questions:

Is the server up?

Is disk space running out?

Did nginx or Docker crash?

Meanwhile, SSH is already (probably) open on your servers, and Python is already installed (or trivial to install). You don’t need a full metrics pipeline to check system health, you just need a reliable way to run commands and interpret the results.

If you can ssh user@server, you can monitor it.

That’s the premise of this article. We’ll keep the stack boring, the moving parts minimal, and the script small enough that you actually understand, and trust, it.


Project Setup

Let's get you started by creating a new project directory and set up the environment:

mkdir server-monitor
cd server-monitor
Enter fullscreen mode Exit fullscreen mode

Then, create a requirements.txt file:

paramiko>=4.0.0
rich>=14.3.2
pyyaml>=6.0.3
requests>=2.32.5
Enter fullscreen mode Exit fullscreen mode

And install the dependencies:

pip install -r requirements.txt
Enter fullscreen mode Exit fullscreen mode

What each library does:

  • paramiko -> SSH connections and command execution
  • rich -> Beautiful terminal output with tables and colors
  • pyyaml -> Parse our server configuration file
  • requests -> Send alerts to webhooks (Slack)

Now create a servers.yml file to define which servers to monitor:

servers:
  - name: prd-dokku
    host: XX.XX.XX.XX
    user: root
    key_file: id_rsa
    services:
      - docker

  - name: prd-ghost
    host: XX.XX.XX.XX
    user: root
    key_file: id_rsa
    services:
      - docker

  - name: prd-n8n
    host: XX.XX.XX.XX
    user: root
    key_file: id_rsa
    services:
      - docker

thresholds:
  cpu_load_factor: 2.0 # Alert if load > cores × 2
  memory_percent: 85
  disk_percent: 25
Enter fullscreen mode Exit fullscreen mode

Here you should replace the IP addresses and usernames with your actual servers.

Make sure you have SSH key authentication set up for each server. If your key has a passphrase, use the SSH_KEY_PASSPHRASE environment variable or add key_passphrase to a server entry; the script will also prompt interactively if needed.


SSH Connection Manager

Let's start building the monitor. Create a file called monitor.py and you'll build it piece by piece.

First, you'll need a function to connect to servers and run commands over SSH:

"""
Simple server monitoring over SSH
"""

import os
import sys
import threading
import getpass
import paramiko
import yaml
from typing import Dict, List, Optional
from dataclasses import dataclass, field


@dataclass
class ServerMetrics:
    """Container for server metrics"""
    name: str
    reachable: bool = True
    cpu_load: Optional[float] = None
    memory_percent: Optional[float] = None
    disk_percent: Optional[float] = None
    services: Dict[str, str] = field(default_factory=dict)
    error: Optional[str] = None


class SSHClient:
    """Manages SSH connections to servers"""

    def __init__(
        self,
        host: str,
        user: str,
        key_file: Optional[str] = None,
        key_passphrase: Optional[str] = None,
    ):
        self.host = host
        self.user = user
        self.key_file = key_file
        self.key_passphrase = key_passphrase
        self.client = None

    def _load_pkey(self, passphrase: Optional[str] = None) -> Optional[paramiko.PKey]:
        """Load private key, trying RSA then Ed25519. Returns None to fall back to key_filename."""
        if not self.key_file:
            return None
        password = passphrase or self.key_passphrase
        # Try RSA key first (PEM format)
        try:
            return paramiko.RSAKey.from_private_key_file(
                self.key_file, password=password
            )
        except paramiko.ssh_exception.PasswordRequiredException:
            if password is None:
                password = getpass.getpass(f"SSH key passphrase for {self.key_file}: ")
            return paramiko.RSAKey.from_private_key_file(
                self.key_file, password=password
            )
        except (paramiko.ssh_exception.SSHException, OSError):
            pass
        # Try Ed25519
        try:
            return paramiko.Ed25519Key.from_private_key_file(
                self.key_file, password=password
            )
        except paramiko.ssh_exception.PasswordRequiredException:
            if password is None:
                password = getpass.getpass(f"SSH key passphrase for {self.key_file}: ")
            return paramiko.Ed25519Key.from_private_key_file(
                self.key_file, password=password
            )
        except (paramiko.ssh_exception.SSHException, OSError):
            pass
        return None

    def connect(self) -> bool:
        """Establish SSH connection"""
        try:
            self.client = paramiko.SSHClient()
            self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

            connect_kwargs = {
                'hostname': self.host,
                'username': self.user,
                'timeout': 10,
                'look_for_keys': True,
                'allow_agent': True,
            }

            # Load key explicitly (RSA / Ed25519) with optional passphrase
            if self.key_file:
                pkey = self._load_pkey()
                if pkey is not None:
                    connect_kwargs['pkey'] = pkey
                else:
                    connect_kwargs['key_filename'] = self.key_file
                    if self.key_passphrase:
                        connect_kwargs['passphrase'] = self.key_passphrase

            self.client.connect(**connect_kwargs)
            return True

        except Exception as e:
            print(f"❌ Failed to connect to {self.host}: {e}")
            return False

    def run_command(
        self, command: str, stream_output: bool = False
    ) -> tuple[str, str, int]:
        """
        Run a command over SSH.
        Returns: (stdout, stderr, exit_code).
        If stream_output=True, print stdout/stderr as it arrives (for long-running commands).
        """
        if not self.client:
            return "", "Not connected", 1

        try:
            stdin, stdout, stderr = self.client.exec_command(command)
            channel = stdout.channel

            if stream_output:
                out_lines = []
                err_lines = []

                def read_stdout():
                    for line in iter(stdout.readline, ""):
                        decoded = line
                        out_lines.append(decoded)
                        print(decoded, end="")

                def read_stderr():
                    for line in iter(stderr.readline, ""):
                        decoded = line
                        err_lines.append(decoded)
                        print(decoded, end="", file=sys.stderr)

                t1 = threading.Thread(target=read_stdout)
                t2 = threading.Thread(target=read_stderr)
                t1.daemon = True
                t2.daemon = True
                t1.start()
                t2.start()
                exit_code = channel.recv_exit_status()
                t1.join(timeout=1)
                t2.join(timeout=1)
                return ("".join(out_lines).strip(), "".join(err_lines).strip(), exit_code)

            exit_code = channel.recv_exit_status()
            return (
                stdout.read().decode("utf-8", errors="replace").strip(),
                stderr.read().decode("utf-8", errors="replace").strip(),
                exit_code,
            )
        except Exception as e:
            return "", str(e), 1

    def close(self):
        """Close SSH connection"""
        if self.client:
            self.client.close()


def load_config(config_file: str = 'servers.yml') -> dict:
    """Load server configuration from YAML"""
    with open(config_file, 'r') as f:
        return yaml.safe_load(f)
Enter fullscreen mode Exit fullscreen mode

This gives you the foundation: a clean SSH client that can connect and run commands, plus a data structure to hold the metrics.

It defines:

  • A ServerMetrics dataclass to store health information per server (reachability, CPU, memory, disk, service status, and errors).
  • An SSHClient wrapper around Paramiko that:
    • Handles SSH connections using RSA or Ed25519 keys (with optional passphrases)
    • Falls back to SSH agents or default keys when possible
    • Executes remote commands and captures output, errors, and exit codes (with optional streaming for long-running commands like apt upgrade)
    • Manages connection lifecycle cleanly
  • A load_config helper to read server definitions from a YAML file.

For more Python fundamentals that help when reading or extending this script, for instance monitoring at the file-system level (e.g. watching config or log files), see Mastering File System Monitoring with Watchdog in Python.

Note on SSH key passphrase: If your private key has a passphrase you can (1) set the SSH_KEY_PASSPHRASE environment variable (recommended for a cron job), (2) add key_passphrase to a server entry in servers.yml, or (3) let the script prompt you interactively when needed.


Collecting Metrics

Now you'll add functions to gather actual metrics from each server. Add these functions to monitor.py:

def get_cpu_load(ssh: SSHClient) -> Optional[float]:
    """Get 1-minute load average from uptime command"""
    stdout, stderr, code = ssh.run_command('uptime')
    if code != 0:
        return None

    # Parse: "... load average: 0.52, 0.58, 0.59"
    try:
        load_str = stdout.split('load average:')[1].split(',')[0].strip()
        return float(load_str)
    except (IndexError, ValueError):
        return None


def get_memory_usage(ssh: SSHClient) -> Optional[float]:
    """Get memory usage percentage from free command"""
    stdout, stderr, code = ssh.run_command('free -m')
    if code != 0:
        return None

    # Parse output to get used/total
    try:
        lines = stdout.split('\n')
        mem_line = [l for l in lines if l.startswith('Mem:')][0]
        parts = mem_line.split()
        total = float(parts[1])
        used = float(parts[2])
        return (used / total) * 100
    except (IndexError, ValueError):
        return None


def get_disk_usage(ssh: SSHClient) -> Optional[float]:
    """Get root disk usage percentage from df command"""
    stdout, stderr, code = ssh.run_command('df -h /')
    if code != 0:
        return None

    # Parse: "Filesystem Size Used Avail Use% Mounted"
    try:
        lines = stdout.split('\n')
        data_line = lines[1]
        use_percent = data_line.split()[4].rstrip('%')
        return float(use_percent)
    except (IndexError, ValueError):
        return None


def get_service_status(ssh: SSHClient, service_name: str) -> str:
    """Check if a systemd service is active"""
    stdout, stderr, code = ssh.run_command(f'systemctl is-active {service_name}')

    if stdout == 'active':
        return 'active'
    elif code != 0:
        # Could be 'inactive', 'failed', or service doesn't exist
        return stdout if stdout else 'unknown'
    return 'unknown'


def collect_server_metrics(server_config: dict, thresholds: dict) -> ServerMetrics:
    """
    Connect to a server and collect all metrics
    """
    name = server_config['name']
    host = server_config['host']
    user = server_config['user']
    services = server_config.get('services', [])
    key_file = server_config.get('key_file')
    key_passphrase = server_config.get('key_passphrase') or os.environ.get('SSH_KEY_PASSPHRASE')

    metrics = ServerMetrics(name=name)
    ssh = SSHClient(host, user, key_file, key_passphrase)

    # Try to connect
    if not ssh.connect():
        metrics.reachable = False
        metrics.error = f"Could not connect to {host}"
        return metrics

    try:
        # Gather all metrics
        metrics.cpu_load = get_cpu_load(ssh)
        metrics.memory_percent = get_memory_usage(ssh)
        metrics.disk_percent = get_disk_usage(ssh)

        # Check services
        for service in services:
            metrics.services[service] = get_service_status(ssh, service)

    except Exception as e:
        metrics.error = str(e)

    finally:
        ssh.close()

    return metrics
Enter fullscreen mode Exit fullscreen mode

This code collects basic server health metrics over SSH using standard Linux commands.

It:

  • Retrieves CPU load, memory usage, and disk usage by running uptime, free, and df remotely and parsing their output
  • Checks the status of systemd services via systemctl is-active
  • Wraps all results in a ServerMetrics object
  • Handles connection failures and parsing errors gracefully
  • Uses SSH keys (with optional passphrases) for secure, agentless access

If something fails, it returns None or a status string rather than crashing, which is important when monitoring multiple servers.


Terminal Dashboard with Rich

Now for the fun part, displaying our metrics in a beautiful terminal table. Add this to monitor.py:

from rich.console import Console
from rich.table import Table
from rich import box


def create_status_icon(value: Optional[float], threshold: float,
                       reverse: bool = False) -> str:
    """
    Create a colored status icon based on value vs threshold
    reverse=True means lower is better (e.g., disk usage)
    """
    if value is None:
        return ""

    if reverse:
        # Lower is better (disk, memory)
        if value < threshold * 0.7:
            return ""
        elif value < threshold:
            return "⚠️"
        else:
            return ""
    else:
        # Higher is better
        if value > threshold:
            return ""
        elif value > threshold * 0.7:
            return "⚠️"
        else:
            return ""


def format_metric_value(value: Optional[float], suffix: str = "",
                       decimals: int = 1) -> str:
    """Format a metric value for display"""
    if value is None:
        return ""
    return f"{value:.{decimals}f}{suffix}"


def display_dashboard(metrics_list: List[ServerMetrics], thresholds: dict):
    """Display monitoring dashboard using Rich tables"""
    console = Console()

    # Create table
    table = Table(title="🖥️  Server Monitoring Dashboard",
                  box=box.ROUNDED,
                  show_header=True,
                  header_style="bold cyan")

    # Add columns
    table.add_column("Server", style="bold white", no_wrap=True)
    table.add_column("Status", justify="center")
    table.add_column("CPU Load", justify="right")
    table.add_column("Memory", justify="right")
    table.add_column("Disk", justify="right")
    table.add_column("Services", justify="left")

    # Add rows
    for metrics in metrics_list:
        # Overall status
        if not metrics.reachable:
            status = "🔴 Down"
        elif metrics.error:
            status = "⚠️ Error"
        else:
            status = "🟢 Up"

        # CPU with status icon
        cpu_display = format_metric_value(metrics.cpu_load)
        cpu_icon = create_status_icon(
            metrics.cpu_load,
            thresholds.get('cpu_load_factor', 2.0),
            reverse=False
        )
        cpu_cell = f"{cpu_icon} {cpu_display}"

        # Memory with status icon and percentage
        mem_display = format_metric_value(metrics.memory_percent, "%")
        mem_icon = create_status_icon(
            metrics.memory_percent,
            thresholds.get('memory_percent', 85),
            reverse=True
        )
        mem_cell = f"{mem_icon} {mem_display}"

        # Disk with status icon and percentage
        disk_display = format_metric_value(metrics.disk_percent, "%")
        disk_icon = create_status_icon(
            metrics.disk_percent,
            thresholds.get('disk_percent', 80),
            reverse=True
        )
        disk_cell = f"{disk_icon} {disk_display}"

        # Services status
        if metrics.services:
            service_items = []
            for svc, status_val in metrics.services.items():
                icon = "" if status_val == "active" else ""
                service_items.append(f"{icon} {svc}")
            services_cell = "\n".join(service_items)
        else:
            services_cell = ""

        # Add row to table
        table.add_row(
            metrics.name,
            status,
            cpu_cell,
            mem_cell,
            disk_cell,
            services_cell
        )

    # Display
    console.print()
    console.print(table)
    console.print()
Enter fullscreen mode Exit fullscreen mode

The rich library makes this surprisingly easy.

It:

  • Converts raw metric values into clear status icons (✅ ⚠️ ❌) based on configurable thresholds
  • Formats CPU, memory, and disk metrics for human-friendly display
  • Builds a styled Rich table showing per-server health, resource usage, and service status

Alert System

Monitoring without alerts is just anxiety. Let's add a simple alert system that can notify you via ntfy.sh. Add these functions:

def check_thresholds(metrics: ServerMetrics, thresholds: dict) -> List[str]:
    """
    Check if any metrics breach thresholds
    Returns list of alert messages
    """
    alerts = []

    if not metrics.reachable:
        alerts.append(f"🔴 {metrics.name}: Server unreachable")
        return alerts

    # CPU load
    if metrics.cpu_load and metrics.cpu_load > thresholds.get('cpu_load_factor', 2.0):
        alerts.append(
            f"⚠️ {metrics.name}: High CPU load ({metrics.cpu_load:.2f})"
        )

    # Memory
    if metrics.memory_percent and metrics.memory_percent > thresholds.get('memory_percent', 85):
        alerts.append(
            f"⚠️ {metrics.name}: High memory usage ({metrics.memory_percent:.1f}%)"
        )

    # Disk
    if metrics.disk_percent and metrics.disk_percent > thresholds.get('disk_percent', 80):
        alerts.append(
            f"⚠️ {metrics.name}: High disk usage ({metrics.disk_percent:.1f}%)"
        )

    # Failed services
    for service, status in metrics.services.items():
        if status != 'active':
            alerts.append(
                f"{metrics.name}: Service '{service}' is {status}"
            )

    return alerts


def send_alert_ntfy(message: str, topic: str):
    """Send alert to ntfy.sh"""
    try:
        import requests
        url = f"https://ntfy.sh/{topic}"
        requests.post(url, data=message.encode('utf-8'))
    except Exception as e:
        print(f"Failed to send ntfy alert: {e}")


def send_alerts(alerts: List[str], config: dict):
    """Send alerts through configured channels"""
    if not alerts:
        return

    alert_config = config.get('alerts', {})
    message = "\n".join(alerts)

    # Send to ntfy
    if ntfy_topic := alert_config.get('ntfy_topic'):
        send_alert_ntfy(message, ntfy_topic)

    # Print to console
    console = Console()
    console.print(f"\n[bold red]🚨 Alerts:[/bold red]")
    for alert in alerts:
        console.print(f"  {alert}")
    console.print()
Enter fullscreen mode Exit fullscreen mode

The alert logic is straightforward:

  • Evaluates collected server metrics against configurable thresholds
  • Detects unreachable servers, high CPU/memory/disk usage, and failed services
  • Generates human-readable alert messages
  • Sends alerts to ntfy.sh

You can easily add more channels (email, Slack, Telegram, PagerDuty) by adding similar send_alert_* functions.

To configure alerts, add this to your servers.yml:

alerts:
  ntfy_topic: "myapp-monitoring" # Optional
Enter fullscreen mode Exit fullscreen mode

Putting It All Together

Now let's add the main function that ties everything together:

def main():
    """Main monitoring loop"""
    console = Console()

    try:
        # Load configuration
        config = load_config('servers.yml')
        servers = config.get('servers', [])
        thresholds = config.get('thresholds', {})

        if not servers:
            console.print("[red]No servers configured in servers.yml[/red]")
            return

        console.print(f"[cyan]Monitoring {len(servers)} server(s)...[/cyan]\n")

        # Collect metrics from all servers
        all_metrics = []
        all_alerts = []

        for server in servers:
            metrics = collect_server_metrics(server, thresholds)
            all_metrics.append(metrics)

            # Check for alerts
            alerts = check_thresholds(metrics, thresholds)
            all_alerts.extend(alerts)

        # Display dashboard
        display_dashboard(all_metrics, thresholds)

        # Send alerts if any
        if all_alerts:
            send_alerts(all_alerts, config)
        else:
            console.print("[green]✅ All systems healthy[/green]\n")

    except FileNotFoundError:
        console.print("[red]Error: servers.yml not found[/red]")
    except Exception as e:
        console.print(f"[red]Error: {e}[/red]")


if __name__ == '__main__':
    main()
Enter fullscreen mode Exit fullscreen mode

This the main entry point of the monitoring script:

  • Loads server and threshold configuration from servers.yml
  • Connects to each server and collects health metrics
  • Evaluates metrics against alert thresholds
  • Displays a Rich-based monitoring dashboard
  • Sends alerts if any issues are detected, otherwise reports a healthy state

That's it! You now have a complete monitoring script.

Let's test it:

python monitor.py
Enter fullscreen mode Exit fullscreen mode

You should see a beautiful table showing the status of all your servers:

Monitoring 3 server(s)...

SSH key passphrase for id_rsa: 
SSH key passphrase for id_rsa: 
SSH key passphrase for id_rsa: 

                  🖥️  Server Monitoring Dashboard                  
╭───────────┬────────┬──────────┬──────────┬──────────┬───────────╮
│ Server    │ Status │ CPU Load │   Memory │     Disk │ Services  │
├───────────┼────────┼──────────┼──────────┼──────────┼───────────┤
│ prd-dokku │ 🟢 Up  │   ✅ 0.0 │ ✅ 17.8% │ ✅ 15.0% │ ✅ docker │
│ prd-ghost │ 🟢 Up  │   ✅ 0.5 │ ✅ 46.6% │ ❌ 32.0% │ ✅ docker │
│ prd-n8n   │ 🟢 Up  │   ✅ 0.0 │ ✅ 26.7% │ ⚠️ 19.0% │ ✅ docker │
╰───────────┴────────┴──────────┴──────────┴──────────┴───────────╯


🚨 Alerts:
  ⚠️ prd-ghost: High disk usage (32.0%)
Enter fullscreen mode Exit fullscreen mode

And in this case, you should also receive a notification:

ntfy.sh - Notifications


Remote Server Updates

Let's extend your monitor to handle server updates. Add these functions to monitor.py:

def run_server_update(ssh: SSHClient, stream_output: bool = True) -> dict:
    """
    Run apt update and upgrade on a server.
    stream_output=True prints apt progress so it doesn't look stuck (can take 5–15 min).
    Returns dict with results.
    """
    result = {
        'success': False,
        'output': '',
        'reboot_required': False,
        'error': None
    }

    # Run update and upgrade (stream output so user sees progress)
    # DEBIAN_FRONTEND=noninteractive avoids any prompts that could hang
    cmd = 'sudo DEBIAN_FRONTEND=noninteractive apt-get update && sudo DEBIAN_FRONTEND=noninteractive apt-get upgrade -y 2>&1'
    stdout, stderr, code = ssh.run_command(cmd, stream_output=stream_output)

    result['output'] = stdout
    result['success'] = (code == 0)

    if code != 0:
        result['error'] = stderr or 'Update failed'
        return result

    # Check if reboot is required
    reboot_check_cmd = '[ -f /var/run/reboot-required ] && echo "yes" || echo "no"'
    reboot_stdout, _, _ = ssh.run_command(reboot_check_cmd)
    result['reboot_required'] = (reboot_stdout.strip() == 'yes')

    return result


def update_server(server_config: dict, notify_topic: Optional[str] = None) -> bool:
    """Update a single server and optionally send notification"""
    name = server_config['name']
    host = server_config['host']
    user = server_config['user']
    key_file = server_config.get('key_file')
    key_passphrase = server_config.get('key_passphrase') or os.environ.get('SSH_KEY_PASSPHRASE')

    console = Console()
    console.print(f"[cyan]Updating {name}...[/cyan]")
    console.print("[dim]Running apt update & upgrade (may take 5–15 min). Output streams below.[/dim]\n")

    ssh = SSHClient(host, user, key_file, key_passphrase)

    if not ssh.connect():
        console.print(f"[red]❌ Could not connect to {name}[/red]")
        return False

    try:
        result = run_server_update(ssh)

        if result['success']:
            reboot_msg = "⚠️ Reboot required" if result['reboot_required'] else "No reboot needed"
            console.print(f"[green]✅ {name} updated successfully[/green]")
            console.print(f"   {reboot_msg}")

            # Send notification
            if notify_topic:
                message = f"{name} updated successfully\n{reboot_msg}"
                send_alert_ntfy(message, notify_topic)

            return True
        else:
            console.print(f"[red]❌ {name} update failed: {result['error']}[/red]")

            if notify_topic:
                message = f"{name} update failed\n{result['error']}"
                send_alert_ntfy(message, notify_topic)

            return False

    finally:
        ssh.close()


def update_all_servers():
    """Update all configured servers"""
    config = load_config('servers.yml')
    servers = config.get('servers', [])
    notify_topic = config.get('alerts', {}).get('ntfy_topic')

    console = Console()
    console.print(f"\n[bold cyan]Updating {len(servers)} server(s)...[/bold cyan]\n")

    for server in servers:
        update_server(server, notify_topic)
        console.print()
Enter fullscreen mode Exit fullscreen mode

This code adds remote server update automation to the monitoring tool:

  • Runs apt update and apt upgrade over SSH with streaming output so progress is visible (avoids looking stuck; updates can take 5–15 minutes)
  • Uses DEBIAN_FRONTEND=noninteractive so apt never waits for prompts
  • Detects whether a reboot is required after updates
  • Reports success or failure with clear terminal output
  • Sends update notifications via ntfy.sh

Now update the main() function to support an --update flag:

import sys

def main():
    """Main monitoring script"""
    console = Console()

    # Check for update mode
    if len(sys.argv) > 1 and sys.argv[1] == '--update':
        update_all_servers()
        return

    # ... rest of the monitoring code stays the same ...
Enter fullscreen mode Exit fullscreen mode

You can update all your servers with:

python monitor.py --update
Enter fullscreen mode Exit fullscreen mode

The script will connect to each server, run apt-get update && apt-get upgrade -y (with output streamed so you see progress), check if a reboot is needed, and send you a notification via ntfy:

Updating 3 server(s)...

Updating prd-dokku...
Running apt update & upgrade (may take 5–15 min). Output streams below.

SSH key passphrase for id_rsa:
Hit:1 https://mirror.hetzner.com/ubuntu/packages noble InRelease
Hit:2 https://download.docker.com/linux/ubuntu noble InRelease
Hit:3 https://mirror.hetzner.com/ubuntu/packages noble-updates InRelease
Hit:4 https://mirror.hetzner.com/ubuntu/packages noble-backports InRelease
Hit:5 https://mirror.hetzner.com/ubuntu/security noble-security InRelease
Hit:6 https://packagecloud.io/dokku/dokku/ubuntu noble InRelease
Reading package lists...
Reading package lists...
Building dependency tree...
Reading state information...
Calculating upgrade...
The following upgrades have been deferred due to phasing:
  libldap-common libldap2 python-apt-common python3-apt
The following packages have been kept back:
  linux-image-virtual
0 upgraded, 0 newly installed, 0 to remove and 5 not upgraded.
✅ prd-dokku updated successfully
   ⚠️ Reboot required

Updating prd-ghost...
Running apt update & upgrade (may take 5–15 min). Output streams below.

SSH key passphrase for id_rsa:

[...]

Running kernel seems to be up-to-date.

Restarting services...
 systemctl restart qemu-guest-agent.service

Service restarts being deferred:
 /etc/needrestart/restart.d/dbus.service
 systemctl restart getty@tty1.service
 systemctl restart serial-getty@ttyS0.service
 systemctl restart systemd-logind.service
 systemctl restart unattended-upgrades.service

No containers need to be restarted.

No user sessions are running outdated binaries.

No VM guests are running outdated hypervisor (qemu) binaries on this host.
✅ prd-ghost updated successfully
   ⚠️ Reboot required

Updating prd-n8n...
Running apt update & upgrade (may take 5–15 min). Output streams below.

SSH key passphrase for id_rsa:
[...]

Running kernel seems to be up-to-date.

Restarting services...
 systemctl restart qemu-guest-agent.service

Service restarts being deferred:
 systemctl restart systemd-logind.service
 systemctl restart unattended-upgrades.service

No containers need to be restarted.

No user sessions are running outdated binaries.

No VM guests are running outdated hypervisor (qemu) binaries on this host.
✅ prd-n8n updated successfully
   No reboot needed
Enter fullscreen mode Exit fullscreen mode

You will also get the notifications:

ntfy.sh - Notifications


Wrapping Up

You don't have to choose between "no monitoring" and "full Prometheus/Grafana stack."

This middle ground, one Python script, SSH, and a few shell commands, gives you real visibility with minimal complexity.

The script you just built is production-ready for small setups. It's simple enough that you can debug it at 2 AM when something breaks, yet powerful enough to keep your servers healthy and your phone quiet. If you want to avoid scope-related bugs when extending it, Master Python Variable Scope is a concise, practical guide (LEGB, local vs global).

When you outgrow this approach (dozens of servers, complex dependencies, multiple teams), you'll know exactly what you need from a heavier system. Until then, keep it boring.


Download the complete script: https://github.com/nunombispo/server-monitoring-python-article

Happy monitoring! 🚀


Follow me on Twitter: https://twitter.com/DevAsService

Follow me on Instagram: https://www.instagram.com/devasservice/

Follow me on TikTok: https://www.tiktok.com/@devasservice

Follow me on YouTube: https://www.youtube.com/@DevAsService

Top comments (0)