Skip to content

vaultecki/py-vault-multicast

Repository files navigation

py-vault-multicast

A Python library for multicast-based service discovery in local networks with integrated monitoring and metrics.

License Python

Features

  • 🚀 Simple API - Easy-to-use publisher and listener classes
  • 📡 Multicast Discovery - Find devices/services in local network automatically
  • 📊 Built-in Metrics - Track packets, bytes, errors, and active services
  • 🔄 Auto-retry - Automatic reconnection on errors
  • 🧵 Thread-safe - All operations protected with locks
  • 🎯 PyQt6 Widget - Ready-to-use GUI component for service discovery
  • Lightweight - Minimal dependencies and overhead
  • 🛡️ Context Manager - Clean resource management with with statements

Installation

pip install -r requirements.txt

Requirements

  • Python 3.7+
  • PySignal >= 1.1.1
  • PyQt6 (optional, only for GUI components)

Quick Start

Publisher (Broadcasting Service)

import vault_multicast
import json
import time

# Create service information
service_info = {
    "type": "VaultLibrary",
    "name": "My Service",
    "addr": "192.168.1.100:8080",
    "version": "1.0.0"
}

# Start publisher
publisher = vault_multicast.VaultMultiPublisher(
    message=json.dumps(service_info),
    timeout=2.0  # Broadcast every 2 seconds
)

publisher.start()

# Check metrics
time.sleep(10)
metrics = publisher.get_metrics()
print(f"Sent {metrics['packets_sent']} packets")

publisher.stop()

Listener (Discovering Services)

import vault_multicast

def on_service_found(service_data):
    print(f"Found service: {service_data}")

# Start listener
listener = vault_multicast.VaultMultiListener(
    callback=on_service_found
)

listener.start()

# Or use signals
listener.recv_signal.connect(on_service_found)

# Check metrics
metrics = listener.get_metrics()
print(f"Active services: {metrics['active_services']}")
print(f"Received {metrics['packets_received']} packets")

listener.stop()

Context Manager

import vault_multicast

# Automatic cleanup
with vault_multicast.VaultMultiListener() as listener:
    input("Press Enter to stop...\n")
    print(listener.get_metrics())

GUI Component

Service Discovery Widget

import sys
from PyQt6.QtWidgets import QApplication
from vault_multicast_service_discovery import VaultServiceDiscovery

app = QApplication(sys.argv)

# Create discovery widget with type filter
discovery = VaultServiceDiscovery(type_filter="VaultLibrary")

# Handle service selection
def on_service_selected(service_data):
    print(f"Selected: {service_data}")

discovery.return_signal.connect(on_service_selected)
discovery.show()

sys.exit(app.exec())

Features:

  • Real-time service list with auto-refresh
  • Automatic timeout (removes services after 30 seconds)
  • Live metrics display (packets, bytes, active services)
  • Manual refresh button
  • Metrics reset functionality

API Reference

VaultMultiPublisher

Broadcasts service information via multicast.

Constructor

VaultMultiPublisher(
    group: str = "224.1.1.1",      # Multicast group
    port: int = 5004,               # UDP port
    ttl: int = 2,                   # Time-to-live
    timeout: float = 2.0,           # Seconds between broadcasts
    message: str = "..."            # Message to broadcast
)

Methods

  • start() - Start broadcasting
  • stop(timeout=5.0) - Stop broadcasting
  • update_message(message) - Update broadcast message
  • get_metrics() - Get current metrics dictionary
  • reset_metrics() - Reset all counters
  • is_running() - Check if publisher is active

Properties

  • message - Get/set current broadcast message (thread-safe)

VaultMultiListener

Listens for multicast service announcements.

Constructor

VaultMultiListener(
    group: str = "224.1.1.1",      # Multicast group
    port: int = 5004,               # UDP port
    timeout: float = 2.0,           # Socket timeout
    buffer_size: int = 1400,        # Receive buffer size
    callback: Callable = None       # Optional callback function
)

Methods

  • start() - Start listening
  • stop(timeout=5.0) - Stop listening
  • get_metrics() - Get current metrics dictionary
  • reset_metrics() - Reset all counters
  • is_running() - Check if listener is active

Signals

  • recv_signal - Emitted when message received (PySignal)

Metrics

Both publisher and listener track the following metrics:

{
    "packets_sent": 150,           # Publisher only
    "packets_received": 450,       # Listener only
    "bytes_sent": 45000,           # Publisher only
    "bytes_received": 135000,      # Listener only
    "errors": 0,                   # Error count
    "active_services": 3,          # Listener only (unique services)
    "uptime_seconds": 300.5,       # Time since start
    "packets_per_second": 1.5      # Average rate
}

Message Format

Services should broadcast JSON messages with at least these fields:

{
    "type": "VaultLibrary",           // Service type (required)
    "name": "My Service",             // Display name
    "addr": "192.168.1.100:8080",    // Connection address (required)
    "version": "1.0.0",              // Version string
    "timestamp": 1234567890.123      // Unix timestamp
}

Configuration

Default Settings

DEFAULT_MULTICAST_GROUP = "224.1.1.1"
DEFAULT_PORT = 5004
DEFAULT_TTL = 2
DEFAULT_TIMEOUT = 2.0
DEFAULT_BUFFER_SIZE = 1400
SERVICE_TIMEOUT_SECONDS = 30  # GUI component

Custom Configuration

# Use different multicast group and port
publisher = vault_multicast.VaultMultiPublisher(
    group="239.255.255.250",
    port=1900,
    ttl=4
)

listener = vault_multicast.VaultMultiListener(
    group="239.255.255.250",
    port=1900,
    buffer_size=2048
)

Examples

Complete Publisher/Listener Example

import vault_multicast
import json
import time
import logging

logging.basicConfig(level=logging.INFO)

# Publisher thread
service_info = {
    "type": "TestService",
    "name": "Example Service",
    "addr": "127.0.0.1:8080",
    "timestamp": time.time()
}

publisher = vault_multicast.VaultMultiPublisher(
    message=json.dumps(service_info),
    timeout=1.0
)
publisher.start()

# Listener thread
def on_service(data):
    print(f"Discovered: {data['name']} at {data['addr']}")

listener = vault_multicast.VaultMultiListener(callback=on_service)
listener.start()

# Run for 30 seconds
try:
    time.sleep(30)
finally:
    # Show metrics
    print("\nPublisher Metrics:")
    print(publisher.get_metrics())
    
    print("\nListener Metrics:")
    print(listener.get_metrics())
    
    # Clean shutdown
    publisher.stop()
    listener.stop()

Export Metrics to JSON

import json
from datetime import datetime

def export_metrics(publisher, listener, filename):
    """Export metrics to JSON file."""
    data = {
        "timestamp": datetime.now().isoformat(),
        "publisher": publisher.get_metrics(),
        "listener": listener.get_metrics()
    }
    
    with open(filename, 'w') as f:
        json.dump(data, f, indent=2)
    
    print(f"Metrics exported to {filename}")

# Usage
export_metrics(publisher, listener, "metrics.json")

Dynamic Message Updates

import vault_multicast
import json
import time

publisher = vault_multicast.VaultMultiPublisher(
    message=json.dumps({"type": "test", "addr": "127.0.0.1:8080"}),
    timeout=1.0
)
publisher.start()

# Update message every 10 seconds
for i in range(10):
    time.sleep(10)
    
    new_message = {
        "type": "test",
        "addr": "127.0.0.1:8080",
        "iteration": i,
        "timestamp": time.time()
    }
    
    publisher.update_message(json.dumps(new_message))
    print(f"Updated message (iteration {i})")

publisher.stop()

Testing

Run the built-in test:

# Test publisher and listener
python vault_multicast.py

# Test GUI component
python vault_multicast_service_discovery.py

Thread Safety

All public methods are thread-safe:

  • Metric updates use locks
  • Message updates are atomic
  • Signal emissions are thread-safe (PySignal)
  • Qt operations run on main thread via timers

Performance

Benchmarks

Typical performance on modern hardware:

  • Publisher: ~0.5 packets/second (with 2s timeout)
  • Listener: Handles 1000+ packets/second
  • Memory: ~5-10 MB per instance
  • CPU: <1% during normal operation
  • Metric overhead: <2% performance impact

Optimization Tips

  1. Adjust broadcast interval: Longer intervals reduce network traffic
  2. Filter by type: Use type_filter in GUI to reduce processing
  3. Buffer size: Increase for high-traffic networks
  4. Cleanup interval: Adjust SERVICE_TIMEOUT for your use case

Troubleshooting

No Services Discovered

  1. Check firewall allows UDP multicast (port 5004)
  2. Verify devices are on same network/VLAN
  3. Check multicast routing is enabled
  4. Confirm TTL is sufficient for network topology
# Linux: Enable multicast routing
sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev eth0

High Error Rate

  1. Check network stability
  2. Verify message format is valid JSON
  3. Ensure messages are under buffer_size (default 1400 bytes)
  4. Check for UDP packet loss
# Monitor errors
metrics = listener.get_metrics()
if metrics['errors'] > 10:
    print("High error rate detected!")
    listener.reset_metrics()

Services Timeout Too Quickly

Adjust timeout in service discovery:

# In vault_multicast_service_discovery.py
SERVICE_TIMEOUT_SECONDS = 60  # Increase to 60 seconds

License

Apache License 2.0

Copyright [2025] [ecki]

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Development Setup

git clone https://github.com/yourusername/py-vault-multicast.git
cd py-vault-multicast
pip install -r requirements.txt

Running Tests

python -m pytest tests/

Changelog

Version 2.0.0 (2025)

  • ✨ Added integrated metrics tracking
  • ✨ Added active service counting
  • ✨ Enhanced GUI with real-time metrics
  • ✨ Added automatic service timeout
  • ✨ Improved thread safety
  • ✨ Better error handling and logging
  • 🐛 Fixed race condition in listener thread
  • 🐛 Fixed memory leak in service tracking

Version 1.0.0 (2025)

  • 🎉 Initial release
  • Publisher and Listener classes
  • PyQt6 service discovery widget
  • Basic signal support

Related Projects

Support

Authors

  • ecki - Initial work and maintenance

Acknowledgments

  • Thanks to all contributors
  • Inspired by various service discovery protocols
  • Built with Python and PyQt6

About

simple multicast udp class publisher and receiver, used in other projects

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages