Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 112 additions & 19 deletions snap7/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Drop-in replacement for the ctypes-based client with native Python implementation.
"""

import copy
import logging
import struct
import time
Expand All @@ -20,6 +21,7 @@
from .datatypes import S7WordLen
from .error import S7Error, S7ConnectionError, S7ProtocolError, S7StalePacketError
from .client_base import ClientMixin
from .optimizer import ReadItem, ReadPacket, sort_items, merge_items, packetize, extract_results

from .type import (
Area,
Expand All @@ -38,9 +40,20 @@
CDataArrayType,
)

_VALID_AREA_VALUES: frozenset[int] = frozenset(a.value for a in Area)

logger = logging.getLogger(__name__)


class _OptimizationPlan:
"""Cached optimization plan for repeated read_multi_vars calls with the same layout."""

def __init__(self, cache_key: tuple[int, ...], packets: list[ReadPacket], read_items: list[ReadItem]):
self.cache_key = cache_key
self.packets = packets
self.read_items = read_items


class Client(ClientMixin):
"""
Pure Python S7 client implementation.
Expand Down Expand Up @@ -100,6 +113,11 @@ def __init__(self, lib_location: Optional[str] = None, **kwargs: Any):
Parameter.PDURequest: 480,
}

# Multi-read optimizer state
self._opt_plan: Optional[_OptimizationPlan] = None
self.multi_read_max_gap: int = 5
self.use_optimizer: bool = True

# Async operation state
self._async_pending = False
self._async_result: Optional[bytearray] = None
Expand Down Expand Up @@ -210,6 +228,7 @@ def disconnect(self) -> int:
self.connection = None

self.connected = False
self._opt_plan = None
logger.info(f"Disconnected from {self.host}:{self.port}")
return 0

Expand Down Expand Up @@ -447,53 +466,127 @@ def write_area(self, area: Area, db_number: int, start: int, data: bytearray, wo
return 0

def read_multi_vars(self, items: Union[List[dict[str, Any]], "Array[S7DataItem]"]) -> Tuple[int, Any]:
"""
Read multiple variables in a single request.
"""Read multiple variables in a single request.

When given a list of dicts with two or more items, uses the multi-variable
read optimizer to merge adjacent reads and pack them into minimal PDU
exchanges. This significantly reduces the number of round-trips compared
to reading each variable individually.

Args:
items: List of item specifications or S7DataItem array
items: List of item specifications (dicts with ``area``, ``start``,
``size``, and optionally ``db_number``) **or** a ctypes
``Array[S7DataItem]``.

Returns:
Tuple of (result, items with data)
Tuple of (result_code, data) where *data* is either the updated
ctypes array or a list of bytearrays in the original item order.

Raises:
ValueError: If more than MAX_VARS items are requested
ValueError: If more than MAX_VARS items are requested.
"""
if not items:
return (0, items)

if len(items) > self.MAX_VARS:
raise ValueError(f"Too many items: {len(items)} exceeds MAX_VARS ({self.MAX_VARS})")

# Handle S7DataItem array (ctypes)
# Handle S7DataItem array (ctypes) -- unchanged legacy path
if hasattr(items, "_type_") and hasattr(items[0], "Area"):
# This is a ctypes array of S7DataItem - use cast for type safety
s7_items = cast("Array[S7DataItem]", items)
for s7_item in s7_items:
area = Area(s7_item.Area)
db_number = s7_item.DBNumber
start = s7_item.Start
size = s7_item.Amount
data = self.read_area(area, db_number, start, size)

# Copy data to pData buffer
if s7_item.pData:
for i, b in enumerate(data):
s7_item.pData[i] = b

return (0, items)

# Handle dict list
# Dict list path -- use optimizer for 2+ items
dict_items = cast(List[dict[str, Any]], items)
results = []
for dict_item in dict_items:
area = dict_item["area"]
db_number = dict_item.get("db_number", 0)
start = dict_item["start"]
size = dict_item["size"]
data = self.read_area(area, db_number, start, size)
results.append(data)

if len(dict_items) <= 1 or not self.use_optimizer:
# Single item or optimizer disabled: no optimization needed
results: list[bytearray] = []
for dict_item in dict_items:
area = dict_item["area"]
db_number = dict_item.get("db_number", 0)
start = dict_item["start"]
size = dict_item["size"]
data = self.read_area(area, db_number, start, size)
results.append(data)
return (0, results)

return self._read_multi_vars_optimized(dict_items)

def _read_multi_vars_optimized(self, dict_items: List[dict[str, Any]]) -> Tuple[int, List[bytearray]]:
"""Optimized multi-variable read using merge + packetize strategy.

Args:
dict_items: List of item dicts (area, db_number, start, size).

Returns:
Tuple of (0, list of bytearrays in original order).
"""
# Build ReadItem list
read_items: list[ReadItem] = []
for idx, d in enumerate(dict_items):
area_val = int(d["area"])
db_number = d.get("db_number", 0)
read_items.append(
ReadItem(
area=area_val,
db_number=db_number,
byte_offset=d["start"],
bit_offset=0,
byte_length=d["size"],
index=idx,
)
)

# Build cache key from the item layout
cache_key = tuple(val for ri in read_items for val in (ri.area, ri.db_number, ri.byte_offset, ri.byte_length))

# Reuse cached plan if layout matches
if self._opt_plan is not None and self._opt_plan.cache_key == cache_key:
packets = self._opt_plan.packets
else:
sorted_ri = sort_items(read_items)
max_block = self._max_read_size()
blocks = merge_items(sorted_ri, max_gap=self.multi_read_max_gap, max_block_size=max_block)
packets = packetize(blocks, self.pdu_length)
self._opt_plan = _OptimizationPlan(cache_key, packets, read_items)

# Deep-copy blocks from cached packets so we don't mutate cached state
working_packets = copy.deepcopy(packets)

# Execute each packet
for packet in working_packets:
block_specs = [(blk.area, blk.db_number, blk.start_offset, blk.byte_length) for blk in packet.blocks]

if len(block_specs) == 1:
# Single block: use regular read to avoid multi-read overhead
blk = packet.blocks[0]
data = self.read_area(
Area(blk.area) if blk.area in _VALID_AREA_VALUES else Area.DB,
blk.db_number,
blk.start_offset,
blk.byte_length,
)
blk.buffer = data
else:
# Multi-block: use multi-read PDU
request = self.protocol.build_multi_read_request(block_specs)
response = self._send_receive(request)
block_data_list = self.protocol.extract_multi_read_data(response, len(block_specs))
for blk, buf in zip(packet.blocks, block_data_list):
blk.buffer = buf

# Extract per-item results in original order
results = extract_results(working_packets, len(dict_items))
return (0, results)

def write_multi_vars(self, items: Union[List[dict[str, Any]], List[S7DataItem]]) -> int:
Expand Down
Loading
Loading