A Python library for multicast-based service discovery in local networks with integrated monitoring and metrics.
- 🚀 Simple API - Easy-to-use publisher and listener classes
- 📡 Multicast Discovery - Find devices/services in local network automatically
- 📊 Built-in Metrics - Track packets, bytes, errors, and active services
- 🔄 Auto-retry - Automatic reconnection on errors
- 🧵 Thread-safe - All operations protected with locks
- 🎯 PyQt6 Widget - Ready-to-use GUI component for service discovery
- ⚡ Lightweight - Minimal dependencies and overhead
- 🛡️ Context Manager - Clean resource management with
withstatements
pip install -r requirements.txt- Python 3.7+
- PySignal >= 1.1.1
- PyQt6 (optional, only for GUI components)
import vault_multicast
import json
import time
# Create service information
service_info = {
"type": "VaultLibrary",
"name": "My Service",
"addr": "192.168.1.100:8080",
"version": "1.0.0"
}
# Start publisher
publisher = vault_multicast.VaultMultiPublisher(
message=json.dumps(service_info),
timeout=2.0 # Broadcast every 2 seconds
)
publisher.start()
# Check metrics
time.sleep(10)
metrics = publisher.get_metrics()
print(f"Sent {metrics['packets_sent']} packets")
publisher.stop()import vault_multicast
def on_service_found(service_data):
print(f"Found service: {service_data}")
# Start listener
listener = vault_multicast.VaultMultiListener(
callback=on_service_found
)
listener.start()
# Or use signals
listener.recv_signal.connect(on_service_found)
# Check metrics
metrics = listener.get_metrics()
print(f"Active services: {metrics['active_services']}")
print(f"Received {metrics['packets_received']} packets")
listener.stop()import vault_multicast
# Automatic cleanup
with vault_multicast.VaultMultiListener() as listener:
input("Press Enter to stop...\n")
print(listener.get_metrics())import sys
from PyQt6.QtWidgets import QApplication
from vault_multicast_service_discovery import VaultServiceDiscovery
app = QApplication(sys.argv)
# Create discovery widget with type filter
discovery = VaultServiceDiscovery(type_filter="VaultLibrary")
# Handle service selection
def on_service_selected(service_data):
print(f"Selected: {service_data}")
discovery.return_signal.connect(on_service_selected)
discovery.show()
sys.exit(app.exec())Features:
- Real-time service list with auto-refresh
- Automatic timeout (removes services after 30 seconds)
- Live metrics display (packets, bytes, active services)
- Manual refresh button
- Metrics reset functionality
Broadcasts service information via multicast.
VaultMultiPublisher(
group: str = "224.1.1.1", # Multicast group
port: int = 5004, # UDP port
ttl: int = 2, # Time-to-live
timeout: float = 2.0, # Seconds between broadcasts
message: str = "..." # Message to broadcast
)start()- Start broadcastingstop(timeout=5.0)- Stop broadcastingupdate_message(message)- Update broadcast messageget_metrics()- Get current metrics dictionaryreset_metrics()- Reset all countersis_running()- Check if publisher is active
message- Get/set current broadcast message (thread-safe)
Listens for multicast service announcements.
VaultMultiListener(
group: str = "224.1.1.1", # Multicast group
port: int = 5004, # UDP port
timeout: float = 2.0, # Socket timeout
buffer_size: int = 1400, # Receive buffer size
callback: Callable = None # Optional callback function
)start()- Start listeningstop(timeout=5.0)- Stop listeningget_metrics()- Get current metrics dictionaryreset_metrics()- Reset all countersis_running()- Check if listener is active
recv_signal- Emitted when message received (PySignal)
Both publisher and listener track the following metrics:
{
"packets_sent": 150, # Publisher only
"packets_received": 450, # Listener only
"bytes_sent": 45000, # Publisher only
"bytes_received": 135000, # Listener only
"errors": 0, # Error count
"active_services": 3, # Listener only (unique services)
"uptime_seconds": 300.5, # Time since start
"packets_per_second": 1.5 # Average rate
}Services should broadcast JSON messages with at least these fields:
{
"type": "VaultLibrary", // Service type (required)
"name": "My Service", // Display name
"addr": "192.168.1.100:8080", // Connection address (required)
"version": "1.0.0", // Version string
"timestamp": 1234567890.123 // Unix timestamp
}DEFAULT_MULTICAST_GROUP = "224.1.1.1"
DEFAULT_PORT = 5004
DEFAULT_TTL = 2
DEFAULT_TIMEOUT = 2.0
DEFAULT_BUFFER_SIZE = 1400
SERVICE_TIMEOUT_SECONDS = 30 # GUI component# Use different multicast group and port
publisher = vault_multicast.VaultMultiPublisher(
group="239.255.255.250",
port=1900,
ttl=4
)
listener = vault_multicast.VaultMultiListener(
group="239.255.255.250",
port=1900,
buffer_size=2048
)import vault_multicast
import json
import time
import logging
logging.basicConfig(level=logging.INFO)
# Publisher thread
service_info = {
"type": "TestService",
"name": "Example Service",
"addr": "127.0.0.1:8080",
"timestamp": time.time()
}
publisher = vault_multicast.VaultMultiPublisher(
message=json.dumps(service_info),
timeout=1.0
)
publisher.start()
# Listener thread
def on_service(data):
print(f"Discovered: {data['name']} at {data['addr']}")
listener = vault_multicast.VaultMultiListener(callback=on_service)
listener.start()
# Run for 30 seconds
try:
time.sleep(30)
finally:
# Show metrics
print("\nPublisher Metrics:")
print(publisher.get_metrics())
print("\nListener Metrics:")
print(listener.get_metrics())
# Clean shutdown
publisher.stop()
listener.stop()import json
from datetime import datetime
def export_metrics(publisher, listener, filename):
"""Export metrics to JSON file."""
data = {
"timestamp": datetime.now().isoformat(),
"publisher": publisher.get_metrics(),
"listener": listener.get_metrics()
}
with open(filename, 'w') as f:
json.dump(data, f, indent=2)
print(f"Metrics exported to {filename}")
# Usage
export_metrics(publisher, listener, "metrics.json")import vault_multicast
import json
import time
publisher = vault_multicast.VaultMultiPublisher(
message=json.dumps({"type": "test", "addr": "127.0.0.1:8080"}),
timeout=1.0
)
publisher.start()
# Update message every 10 seconds
for i in range(10):
time.sleep(10)
new_message = {
"type": "test",
"addr": "127.0.0.1:8080",
"iteration": i,
"timestamp": time.time()
}
publisher.update_message(json.dumps(new_message))
print(f"Updated message (iteration {i})")
publisher.stop()Run the built-in test:
# Test publisher and listener
python vault_multicast.py
# Test GUI component
python vault_multicast_service_discovery.pyAll public methods are thread-safe:
- Metric updates use locks
- Message updates are atomic
- Signal emissions are thread-safe (PySignal)
- Qt operations run on main thread via timers
Typical performance on modern hardware:
- Publisher: ~0.5 packets/second (with 2s timeout)
- Listener: Handles 1000+ packets/second
- Memory: ~5-10 MB per instance
- CPU: <1% during normal operation
- Metric overhead: <2% performance impact
- Adjust broadcast interval: Longer intervals reduce network traffic
- Filter by type: Use type_filter in GUI to reduce processing
- Buffer size: Increase for high-traffic networks
- Cleanup interval: Adjust SERVICE_TIMEOUT for your use case
- Check firewall allows UDP multicast (port 5004)
- Verify devices are on same network/VLAN
- Check multicast routing is enabled
- Confirm TTL is sufficient for network topology
# Linux: Enable multicast routing
sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev eth0- Check network stability
- Verify message format is valid JSON
- Ensure messages are under buffer_size (default 1400 bytes)
- Check for UDP packet loss
# Monitor errors
metrics = listener.get_metrics()
if metrics['errors'] > 10:
print("High error rate detected!")
listener.reset_metrics()Adjust timeout in service discovery:
# In vault_multicast_service_discovery.py
SERVICE_TIMEOUT_SECONDS = 60 # Increase to 60 secondsApache License 2.0
Copyright [2025] [ecki]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Contributions are welcome! Please feel free to submit a Pull Request.
git clone https://github.com/yourusername/py-vault-multicast.git
cd py-vault-multicast
pip install -r requirements.txtpython -m pytest tests/- ✨ Added integrated metrics tracking
- ✨ Added active service counting
- ✨ Enhanced GUI with real-time metrics
- ✨ Added automatic service timeout
- ✨ Improved thread safety
- ✨ Better error handling and logging
- 🐛 Fixed race condition in listener thread
- 🐛 Fixed memory leak in service tracking
- 🎉 Initial release
- Publisher and Listener classes
- PyQt6 service discovery widget
- Basic signal support
- mDNS/Zeroconf - Alternative service discovery
- PySignal - Signal/slot implementation
- PyQt6 - Python Qt bindings
- 📧 Email: support@example.com
- 🐛 Issues: GitHub Issues
- 💬 Discussions: GitHub Discussions
- ecki - Initial work and maintenance
- Thanks to all contributors
- Inspired by various service discovery protocols
- Built with Python and PyQt6