Skip to content

Core

slmp.core

Core codec/types/helpers for SLMP 4E binary.

Classes

BlockReadResult dataclass

Result of block device read.

Source code in slmp\core.py
166
167
168
169
170
171
@dataclass(frozen=True)
class BlockReadResult:
    """Result of block device read."""

    word_blocks: list[DeviceBlockResult]
    bit_blocks: list[DeviceBlockResult]

DeviceBlockResult dataclass

Result of a single device block in block access.

Source code in slmp\core.py
158
159
160
161
162
163
@dataclass(frozen=True)
class DeviceBlockResult:
    """Result of a single device block in block access."""

    device: str
    values: list[int]

DeviceRef dataclass

Device reference.

Attributes:

Name Type Description
code str

Device code string (e.g. 'D', 'X').

number int

Device address number.

Source code in slmp\core.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@dataclass(frozen=True)
class DeviceRef:
    """Device reference.

    Attributes:
        code: Device code string (e.g. 'D', 'X').
        number: Device address number.
    """

    code: str
    number: int

    def __str__(self) -> str:
        """Return the string representation of the device (e.g., 'D100', 'X1F')."""
        if self.code in DEVICE_CODES and DEVICE_CODES[self.code].radix == 16:
            return f"{self.code}{self.number:X}"
        return f"{self.code}{self.number}"
Functions
__str__()

Return the string representation of the device (e.g., 'D100', 'X1F').

Source code in slmp\core.py
84
85
86
87
88
def __str__(self) -> str:
    """Return the string representation of the device (e.g., 'D100', 'X1F')."""
    if self.code in DEVICE_CODES and DEVICE_CODES[self.code].radix == 16:
        return f"{self.code}{self.number:X}"
    return f"{self.code}{self.number}"

ExtendedDevice dataclass

Extended Device text plus optional qualified extension-specification override.

Source code in slmp\core.py
242
243
244
245
246
247
248
@dataclass(frozen=True)
class ExtendedDevice:
    """Extended Device text plus optional qualified extension-specification override."""

    ref: DeviceRef
    extension_specification: int | None = None
    direct_memory_specification: int | None = None

ExtensionSpec dataclass

Extended Device extension fields (binary, 0080..0083).

Source code in slmp\core.py
231
232
233
234
235
236
237
238
239
@dataclass(frozen=True)
class ExtensionSpec:
    """Extended Device extension fields (binary, 0080..0083)."""

    extension_specification: int = 0x0000
    extension_specification_modification: int = 0x00
    device_modification_index: int = 0x00
    device_modification_flags: int = 0x00
    direct_memory_specification: int = DIRECT_MEMORY_NORMAL

LabelArrayReadPoint dataclass

Request point for array label read.

Source code in slmp\core.py
115
116
117
118
119
120
121
@dataclass(frozen=True)
class LabelArrayReadPoint:
    """Request point for array label read."""

    label: str
    unit_specification: int
    array_data_length: int

LabelArrayReadResult dataclass

Result of array label read.

Source code in slmp\core.py
187
188
189
190
191
192
193
194
@dataclass(frozen=True)
class LabelArrayReadResult:
    """Result of array label read."""

    data_type_id: int
    unit_specification: int
    array_data_length: int
    data: bytes

LabelArrayWritePoint dataclass

Request point for array label write.

Source code in slmp\core.py
124
125
126
127
128
129
130
131
@dataclass(frozen=True)
class LabelArrayWritePoint:
    """Request point for array label write."""

    label: str
    unit_specification: int
    array_data_length: int
    data: bytes

LabelRandomReadResult dataclass

Result of random label read.

Source code in slmp\core.py
197
198
199
200
201
202
203
204
@dataclass(frozen=True)
class LabelRandomReadResult:
    """Result of random label read."""

    data_type_id: int
    spare: int
    read_data_length: int
    data: bytes

LabelRandomWritePoint dataclass

Request point for random label write.

Source code in slmp\core.py
134
135
136
137
138
139
@dataclass(frozen=True)
class LabelRandomWritePoint:
    """Request point for random label write."""

    label: str
    data: bytes

LongTimerResult dataclass

Result of long timer read.

Source code in slmp\core.py
174
175
176
177
178
179
180
181
182
183
184
@dataclass(frozen=True)
class LongTimerResult:
    """Result of long timer read."""

    index: int
    device: str
    current_value: int
    contact: bool
    coil: bool
    status_word: int
    raw_words: list[int]

MonitorResult dataclass

Result of registered monitor device read.

Source code in slmp\core.py
150
151
152
153
154
155
@dataclass(frozen=True)
class MonitorResult:
    """Result of registered monitor device read."""

    word: list[int]
    dword: list[int]

RandomReadResult dataclass

Result of random device read.

Source code in slmp\core.py
142
143
144
145
146
147
@dataclass(frozen=True)
class RandomReadResult:
    """Result of random device read."""

    word: dict[str, int]
    dword: dict[str, int]

SlmpResponse dataclass

Decoded SLMP response frame.

Attributes:

Name Type Description
serial int

Serial number matching the request.

target SlmpTarget

Source station routing information.

end_code int

Response end code (0x0000 for success).

data bytes

Command-specific response payload.

raw bytes

Full raw binary response frame.

Source code in slmp\core.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
@dataclass(frozen=True)
class SlmpResponse:
    """Decoded SLMP response frame.

    Attributes:
        serial: Serial number matching the request.
        target: Source station routing information.
        end_code: Response end code (0x0000 for success).
        data: Command-specific response payload.
        raw: Full raw binary response frame.
    """

    serial: int
    target: SlmpTarget
    end_code: int
    data: bytes
    raw: bytes

    @property
    def is_success(self) -> bool:
        """Return True if the response end_code is 0."""
        return self.end_code == 0
Attributes
is_success property

Return True if the response end_code is 0.

SlmpTarget dataclass

SLMP frame destination fields.

Attributes:

Name Type Description
network int

Network number (0x00 for local network).

station int

Station number (0xFF for control CPU).

module_io int | ModuleIONo | str

Module I/O number (0x03FF for own station).

multidrop int

Multidrop station number (0x00 for no multidrop).

Source code in slmp\core.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@dataclass(frozen=True)
class SlmpTarget:
    """SLMP frame destination fields.

    Attributes:
        network: Network number (0x00 for local network).
        station: Station number (0xFF for control CPU).
        module_io: Module I/O number (0x03FF for own station).
        multidrop: Multidrop station number (0x00 for no multidrop).
    """

    network: int = 0x00
    station: int = 0xFF
    module_io: int | ModuleIONo | str = 0x03FF
    multidrop: int = 0x00

    def __post_init__(self) -> None:
        """Resolve module_io from enum name or value."""
        value = self.module_io
        if isinstance(value, str):
            try:
                # Try to resolve from ModuleIONo enum by name (case-insensitive)
                resolved = ModuleIONo[value.upper()].value
            except KeyError:
                raise ValueError(f"unknown ModuleIONo keyword: {value}") from None
            object.__setattr__(self, "module_io", resolved)
        elif isinstance(value, ModuleIONo):
            object.__setattr__(self, "module_io", value.value)
Functions
__post_init__()

Resolve module_io from enum name or value.

Source code in slmp\core.py
58
59
60
61
62
63
64
65
66
67
68
69
def __post_init__(self) -> None:
    """Resolve module_io from enum name or value."""
    value = self.module_io
    if isinstance(value, str):
        try:
            # Try to resolve from ModuleIONo enum by name (case-insensitive)
            resolved = ModuleIONo[value.upper()].value
        except KeyError:
            raise ValueError(f"unknown ModuleIONo keyword: {value}") from None
        object.__setattr__(self, "module_io", resolved)
    elif isinstance(value, ModuleIONo):
        object.__setattr__(self, "module_io", value.value)

SlmpTraceFrame dataclass

A single SLMP transaction captured by a trace hook.

Source code in slmp\core.py
216
217
218
219
220
221
222
223
224
225
226
227
228
@dataclass(frozen=True)
class SlmpTraceFrame:
    """A single SLMP transaction captured by a trace hook."""

    serial: int
    command: int
    subcommand: int
    request_data: bytes
    request_frame: bytes
    response_frame: bytes
    response_end_code: int | None
    target: SlmpTarget
    monitoring_timer: int

TypeNameInfo dataclass

Result of READ_TYPE_NAME command.

Source code in slmp\core.py
207
208
209
210
211
212
213
@dataclass(frozen=True)
class TypeNameInfo:
    """Result of READ_TYPE_NAME command."""

    raw: bytes
    model: str
    model_code: int | None

Functions

build_device_modification_flags(*, series, use_indirect_specification=False, register_mode='none')

Build device_modification_flags from Extended Device semantics.

register_mode
  • "none"
  • "z"
  • "lz" (iQ-R/iQ-L only)
Source code in slmp\core.py
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
def build_device_modification_flags(
    *,
    series: PLCSeries,
    use_indirect_specification: bool = False,
    register_mode: str = "none",
) -> int:
    """Build device_modification_flags from Extended Device semantics.

    register_mode:
      - "none"
      - "z"
      - "lz" (iQ-R/iQ-L only)
    """
    mode = register_mode.lower()
    if mode not in {"none", "z", "lz"}:
        raise ValueError(f"register_mode must be one of none,z,lz: {register_mode}")
    if series == PLCSeries.QL and mode == "lz":
        raise ValueError("LZ register mode is not available for Q/L extension subcommands")

    high = 0x0
    if mode == "z":
        high = 0x4
    elif mode == "lz":
        high = 0x8
    low = 0x8 if use_indirect_specification else 0x0
    return ((high & 0xF) << 4) | (low & 0xF)

decode_3e_response(frame)

Decode a 3E response frame.

Source code in slmp\core.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
def decode_3e_response(frame: bytes) -> SlmpResponse:
    """Decode a 3E response frame."""
    if len(frame) < 11:
        raise SlmpError(f"response too short: {len(frame)} bytes")
    if frame[:2] != FRAME_3E_RESPONSE_SUBHEADER:
        got = frame[:2].hex(" ").upper()
        raise SlmpError(f"unexpected 3E response subheader: {got}")

    target = SlmpTarget(
        network=frame[2],
        station=frame[3],
        module_io=int.from_bytes(frame[4:6], "little"),
        multidrop=frame[6],
    )

    response_data_length = int.from_bytes(frame[7:9], "little")
    if len(frame) != 9 + response_data_length:
        raise SlmpError(
            "response size mismatch: "
            f"actual={len(frame)}, expected={9 + response_data_length}, "
            f"response_data_length={response_data_length}"
        )
    if response_data_length < 2:
        raise SlmpError(f"invalid response_data_length: {response_data_length}")

    end_code = int.from_bytes(frame[9:11], "little")
    data = frame[11:]
    return SlmpResponse(serial=0, target=target, end_code=end_code, data=data, raw=frame)

decode_4e_response(frame)

Decode a 4E response frame.

Source code in slmp\core.py
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
def decode_4e_response(frame: bytes) -> SlmpResponse:
    """Decode a 4E response frame."""
    if len(frame) < 15:
        raise SlmpError(f"response too short: {len(frame)} bytes")
    if frame[:2] != FRAME_4E_RESPONSE_SUBHEADER:
        got = frame[:2].hex(" ").upper()
        raise SlmpError(f"unexpected 4E response subheader: {got}")

    serial = int.from_bytes(frame[2:4], "little")
    target = SlmpTarget(
        network=frame[6],
        station=frame[7],
        module_io=int.from_bytes(frame[8:10], "little"),
        multidrop=frame[10],
    )

    response_data_length = int.from_bytes(frame[11:13], "little")
    if len(frame) != 13 + response_data_length:
        raise SlmpError(
            "response size mismatch: "
            f"actual={len(frame)}, expected={13 + response_data_length}, "
            f"response_data_length={response_data_length}"
        )
    if response_data_length < 2:
        raise SlmpError(f"invalid response_data_length: {response_data_length}")

    end_code = int.from_bytes(frame[13:15], "little")
    data = frame[15:]
    return SlmpResponse(serial=serial, target=target, end_code=end_code, data=data, raw=frame)

decode_device_dwords(data)

Decode a byte array into a list of 32-bit double-word values.

Source code in slmp\core.py
715
716
717
718
719
def decode_device_dwords(data: bytes) -> list[int]:
    """Decode a byte array into a list of 32-bit double-word values."""
    if len(data) % 4 != 0:
        raise SlmpError(f"dword data length must be multiple of 4: {len(data)}")
    return [int.from_bytes(data[i : i + 4], "little") for i in range(0, len(data), 4)]

decode_device_words(data)

Decode a byte array into a list of 16-bit word values.

Source code in slmp\core.py
708
709
710
711
712
def decode_device_words(data: bytes) -> list[int]:
    """Decode a byte array into a list of 16-bit word values."""
    if len(data) % 2 != 0:
        raise SlmpError(f"word data length must be even: {len(data)}")
    return [int.from_bytes(data[i : i + 2], "little") for i in range(0, len(data), 2)]

decode_response(frame, *, frame_type)

Decode an SLMP response frame based on frame_type.

Parameters:

Name Type Description Default
frame bytes

Full raw binary response frame.

required
frame_type FrameType

Expected frame format (3E or 4E).

required

Returns:

Type Description
SlmpResponse

A decoded SlmpResponse object.

Raises:

Type Description
SlmpError

If the frame is malformed or the subheader does not match.

Source code in slmp\core.py
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
def decode_response(frame: bytes, *, frame_type: FrameType) -> SlmpResponse:
    """Decode an SLMP response frame based on frame_type.

    Args:
        frame: Full raw binary response frame.
        frame_type: Expected frame format (3E or 4E).

    Returns:
        A decoded `SlmpResponse` object.

    Raises:
        SlmpError: If the frame is malformed or the subheader does not match.
    """
    if frame_type == FrameType.FRAME_3E:
        return decode_3e_response(frame)
    return decode_4e_response(frame)

encode_3e_request(*, target, monitoring_timer, command, subcommand, data=b'')

Encode a full 3E request frame.

Source code in slmp\core.py
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
def encode_3e_request(
    *,
    target: SlmpTarget,
    monitoring_timer: int,
    command: int,
    subcommand: int,
    data: bytes = b"",
) -> bytes:
    """Encode a full 3E request frame."""
    _check_u16(monitoring_timer, "monitoring_timer")
    _check_u16(command, "command")
    _check_u16(subcommand, "subcommand")
    _check_u8(target.network, "target.network")
    _check_u8(target.station, "target.station")
    module_io = int(target.module_io)
    _check_u16(module_io, "target.module_io")
    _check_u8(target.multidrop, "target.multidrop")

    body = bytearray()
    body += target.network.to_bytes(1, "little")
    body += target.station.to_bytes(1, "little")
    body += module_io.to_bytes(2, "little")
    body += target.multidrop.to_bytes(1, "little")

    req_len = 2 + 2 + 2 + len(data)  # timer + command + subcommand + payload
    _check_u16(req_len, "request_data_length")
    body += req_len.to_bytes(2, "little")
    body += monitoring_timer.to_bytes(2, "little")
    body += command.to_bytes(2, "little")
    body += subcommand.to_bytes(2, "little")
    body += data

    frame = bytearray()
    frame += FRAME_3E_REQUEST_SUBHEADER
    frame += body
    return bytes(frame)

encode_4e_request(*, serial, target, monitoring_timer, command, subcommand, data=b'')

Encode a full 4E request frame.

Source code in slmp\core.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
def encode_4e_request(
    *,
    serial: int,
    target: SlmpTarget,
    monitoring_timer: int,
    command: int,
    subcommand: int,
    data: bytes = b"",
) -> bytes:
    """Encode a full 4E request frame."""
    _check_u16(serial, "serial")
    _check_u16(monitoring_timer, "monitoring_timer")
    _check_u16(command, "command")
    _check_u16(subcommand, "subcommand")
    _check_u8(target.network, "target.network")
    _check_u8(target.station, "target.station")
    module_io = int(target.module_io)
    _check_u16(module_io, "target.module_io")
    _check_u8(target.multidrop, "target.multidrop")

    body = bytearray()
    body += target.network.to_bytes(1, "little")
    body += target.station.to_bytes(1, "little")
    body += module_io.to_bytes(2, "little")
    body += target.multidrop.to_bytes(1, "little")

    req_len = 2 + 2 + 2 + len(data)  # timer + command + subcommand + payload
    _check_u16(req_len, "request_data_length")
    body += req_len.to_bytes(2, "little")
    body += monitoring_timer.to_bytes(2, "little")
    body += command.to_bytes(2, "little")
    body += subcommand.to_bytes(2, "little")
    body += data

    frame = bytearray()
    frame += FRAME_4E_REQUEST_SUBHEADER
    frame += serial.to_bytes(2, "little")
    frame += b"\x00\x00"
    frame += body
    return bytes(frame)

encode_device_spec(device, *, series)

Encode a device specification into bytes based on the PLC series.

Source code in slmp\core.py
565
566
567
568
569
570
571
572
573
574
575
576
577
578
def encode_device_spec(device: str | DeviceRef, *, series: PLCSeries) -> bytes:
    """Encode a device specification into bytes based on the PLC series."""
    ref = parse_device(device)
    if ref.code == "R" and ref.number > 32767:
        raise ValueError(f"R device number out of supported range (0..32767): {ref.number}")
    dev = DEVICE_CODES[ref.code]

    if series == PLCSeries.QL:
        if ref.number < 0 or ref.number > 0xFFFFFF:
            raise ValueError(f"device number out of range for Q/L format: {ref.number}")
        return ref.number.to_bytes(3, "little") + (dev.code & 0xFF).to_bytes(1, "little")

    _check_u32(ref.number, "device.number")
    return ref.number.to_bytes(4, "little") + dev.code.to_bytes(2, "little")

encode_extended_device_spec(device, *, series, extension, include_direct_memory_at_end=True)

Encode an Extended Device extended device specification into bytes.

Source code in slmp\core.py
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
def encode_extended_device_spec(
    device: str | DeviceRef,
    *,
    series: PLCSeries,
    extension: ExtensionSpec,
    include_direct_memory_at_end: bool = True,
) -> bytes:
    """Encode an Extended Device extended device specification into bytes."""
    ref, effective_extension = resolve_extended_device_and_extension(device, extension)
    if effective_extension.direct_memory_specification == DIRECT_MEMORY_LINK_DIRECT:
        return _encode_link_direct_device_spec(
            ref,
            extension=effective_extension,
            include_direct_memory_at_end=include_direct_memory_at_end,
        )
    if _uses_capture_aligned_g_hg_layout(ref, extension=effective_extension):
        return _encode_capture_aligned_g_hg_extension_spec(
            ref,
            series=series,
            extension=effective_extension,
            include_direct_memory_at_end=include_direct_memory_at_end,
        )
    payload = bytearray()
    payload += encode_extension_spec(effective_extension)
    payload += encode_device_spec(ref, series=series)
    if include_direct_memory_at_end:
        payload += effective_extension.direct_memory_specification.to_bytes(1, "little")
    return bytes(payload)

encode_extension_spec(spec)

Encode an Extended Device extension specification into bytes.

Source code in slmp\core.py
581
582
583
584
585
586
587
588
589
def encode_extension_spec(spec: ExtensionSpec) -> bytes:
    """Encode an Extended Device extension specification into bytes."""
    _validate_extension_spec(spec)
    return (
        spec.extension_specification.to_bytes(2, "little")
        + spec.extension_specification_modification.to_bytes(1, "little")
        + spec.device_modification_index.to_bytes(1, "little")
        + spec.device_modification_flags.to_bytes(1, "little")
    )

encode_request(*, frame_type, serial, target, monitoring_timer, command, subcommand, data=b'')

Encode an SLMP request frame based on frame_type.

Parameters:

Name Type Description Default
frame_type FrameType

SLMP frame format (3E or 4E).

required
serial int

Serial number for 4E frames (ignored for 3E).

required
target SlmpTarget

Target station routing information.

required
monitoring_timer int

Timeout value in multiples of 250ms.

required
command int

SLMP command code.

required
subcommand int

SLMP subcommand code.

required
data bytes

Command-specific binary payload.

b''

Returns:

Type Description
bytes

The full binary request frame.

Source code in slmp\core.py
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
def encode_request(
    *,
    frame_type: FrameType,
    serial: int,
    target: SlmpTarget,
    monitoring_timer: int,
    command: int,
    subcommand: int,
    data: bytes = b"",
) -> bytes:
    """Encode an SLMP request frame based on frame_type.

    Args:
        frame_type: SLMP frame format (3E or 4E).
        serial: Serial number for 4E frames (ignored for 3E).
        target: Target station routing information.
        monitoring_timer: Timeout value in multiples of 250ms.
        command: SLMP command code.
        subcommand: SLMP subcommand code.
        data: Command-specific binary payload.

    Returns:
        The full binary request frame.
    """
    if frame_type == FrameType.FRAME_3E:
        return encode_3e_request(
            target=target,
            monitoring_timer=monitoring_timer,
            command=command,
            subcommand=subcommand,
            data=data,
        )
    return encode_4e_request(
        serial=serial,
        target=target,
        monitoring_timer=monitoring_timer,
        command=command,
        subcommand=subcommand,
        data=data,
    )

pack_bit_values(values)

Pack a sequence of bit values into binary format.

In SLMP binary bit-unit access, each byte contains two points. The high nibble stores the first point, and the low nibble stores the second.

Parameters:

Name Type Description Default
values Iterable[bool | int]

An iterable of boolean or integer (0/1) values.

required

Returns:

Type Description
bytes

Packed binary data.

Source code in slmp\core.py
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
def pack_bit_values(values: Iterable[bool | int]) -> bytes:
    """Pack a sequence of bit values into binary format.

    In SLMP binary bit-unit access, each byte contains two points.
    The high nibble stores the first point, and the low nibble stores the second.

    Args:
        values: An iterable of boolean or integer (0/1) values.

    Returns:
        Packed binary data.
    """
    bits = [1 if bool(v) else 0 for v in values]
    out = bytearray()
    for i in range(0, len(bits), 2):
        hi = bits[i] & 0x1
        lo = bits[i + 1] & 0x1 if i + 1 < len(bits) else 0
        out.append((hi << 4) | lo)
    return bytes(out)

parse_device(value)

Parse a device string into a DeviceRef.

Parameters:

Name Type Description Default
value str | DeviceRef

Device string (e.g. 'D100', 'X1F') or DeviceRef object.

required

Returns:

Type Description
DeviceRef

A DeviceRef object containing the device code and numeric address.

Raises:

Type Description
ValueError

If the device format is invalid or the code is unknown.

Source code in slmp\core.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def parse_device(value: str | DeviceRef) -> DeviceRef:
    """Parse a device string into a `DeviceRef`.

    Args:
        value: Device string (e.g. 'D100', 'X1F') or `DeviceRef` object.

    Returns:
        A `DeviceRef` object containing the device code and numeric address.

    Raises:
        ValueError: If the device format is invalid or the code is unknown.
    """
    if isinstance(value, DeviceRef):
        return value

    text = value.strip().upper()
    match = re.fullmatch(r"([A-Z]+)([0-9A-F]+)", text)
    if not match:
        valid_codes = ", ".join(sorted(DEVICE_CODES.keys()))
        raise ValueError(
            f"Invalid SLMP device string {value!r}. "
            f"Expected format: <DeviceCode><Number> (e.g. 'D100', 'X1F'). "
            f"Valid device codes: {valid_codes}"
        )

    code, num_txt = match.groups()
    if code not in DEVICE_CODES:
        valid_codes = ", ".join(sorted(DEVICE_CODES.keys()))
        raise ValueError(f"Unknown SLMP device code '{code}' in {value!r}. Valid codes: {valid_codes}")

    base = DEVICE_CODES[code].radix
    number = int(num_txt, base)
    return DeviceRef(code=code, number=number)

parse_extended_device(value)

Parse an Extended Device string (e.g., 'U01\G10', 'J2\SW10') or return ExtendedDevice as-is.

Source code in slmp\core.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def parse_extended_device(value: str | DeviceRef) -> ExtendedDevice:
    r"""Parse an Extended Device string (e.g., 'U01\G10', 'J2\SW10') or return ExtendedDevice as-is."""
    if isinstance(value, DeviceRef):
        return ExtendedDevice(ref=value)

    text = value.strip().upper()

    # J-format: link direct device (e.g. 'J2\SW10')
    j_qualified = re.fullmatch(r"J(\d+)[\\/](.+)", text)
    if j_qualified:
        j_net_txt, device_txt = j_qualified.groups()
        j_network = int(j_net_txt)
        _check_u8(j_network, "extended_device j_network")
        return ExtendedDevice(
            ref=parse_device(device_txt),
            extension_specification=j_network,
            direct_memory_specification=DIRECT_MEMORY_LINK_DIRECT,
        )

    qualified = re.fullmatch(r"U([0-9A-F]+)[\\/](.+)", text)
    if qualified:
        extension_txt, device_txt = qualified.groups()
        extension_specification = int(extension_txt, 16)
        _check_u16(extension_specification, "extended_device extension_specification")
        dev_ref = parse_device(device_txt)
        # G/HG buffer memory devices have a fixed DM by device code (matches GOT pcap-verified format)
        dm: int | None = None
        if dev_ref.code == "G":
            dm = DIRECT_MEMORY_MODULE_ACCESS  # 0xF8
        elif dev_ref.code == "HG":
            dm = DIRECT_MEMORY_CPU_BUFFER  # 0xFA
        return ExtendedDevice(
            ref=dev_ref,
            extension_specification=extension_specification,
            direct_memory_specification=dm,
        )

    return ExtendedDevice(ref=parse_device(value))

resolve_device_subcommand(*, bit_unit, series, extension=False)

Resolve the SLMP subcommand for device read/write operations.

Source code in slmp\core.py
549
550
551
552
553
554
555
556
557
558
559
560
561
562
def resolve_device_subcommand(
    *,
    bit_unit: bool,
    series: PLCSeries,
    extension: bool = False,
) -> int:
    """Resolve the SLMP subcommand for device read/write operations."""
    if extension:
        if series == PLCSeries.QL:
            return SUBCOMMAND_DEVICE_BIT_QL_EXT if bit_unit else SUBCOMMAND_DEVICE_WORD_QL_EXT
        return SUBCOMMAND_DEVICE_BIT_IQR_EXT if bit_unit else SUBCOMMAND_DEVICE_WORD_IQR_EXT
    if series == PLCSeries.QL:
        return SUBCOMMAND_DEVICE_BIT_QL if bit_unit else SUBCOMMAND_DEVICE_WORD_QL
    return SUBCOMMAND_DEVICE_BIT_IQR if bit_unit else SUBCOMMAND_DEVICE_WORD_IQR

resolve_extended_device_and_extension(device, extension)

Resolve device and extension specification, prioritizing explicit qualification in the device string.

Source code in slmp\core.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
def resolve_extended_device_and_extension(
    device: str | DeviceRef,
    extension: ExtensionSpec,
) -> tuple[DeviceRef, ExtensionSpec]:
    """Resolve device and extension specification, prioritizing explicit qualification in the device string."""
    qualified = parse_extended_device(device)
    overrides: dict[str, Any] = {}
    if (
        qualified.extension_specification is not None
        and qualified.extension_specification != extension.extension_specification
    ):
        overrides["extension_specification"] = qualified.extension_specification
    if (
        qualified.direct_memory_specification is not None
        and extension.direct_memory_specification == DIRECT_MEMORY_NORMAL
    ):
        overrides["direct_memory_specification"] = qualified.direct_memory_specification
    if not overrides:
        return qualified.ref, extension
    return qualified.ref, replace(extension, **overrides)

unpack_bit_values(data, count)

Unpack binary bit data into a list of booleans.

Parameters:

Name Type Description Default
data bytes

Binary data received from the PLC.

required
count int

The number of bit points to extract.

required

Returns:

Type Description
list[bool]

A list of boolean values.

Raises:

Type Description
SlmpError

If the data length is insufficient for the requested count.

Source code in slmp\core.py
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
def unpack_bit_values(data: bytes, count: int) -> list[bool]:
    """Unpack binary bit data into a list of booleans.

    Args:
        data: Binary data received from the PLC.
        count: The number of bit points to extract.

    Returns:
        A list of boolean values.

    Raises:
        SlmpError: If the data length is insufficient for the requested count.
    """
    result: list[bool] = []
    for byte in data:
        result.append(bool((byte >> 4) & 0x1))  # Upper 4 bits = first device
        if len(result) >= count:
            return result
        result.append(bool(byte & 0x1))  # Lower 4 bits = second device
        if len(result) >= count:
            return result
    if len(result) != count:
        raise SlmpError(f"bit data too short: needed {count}, got {len(result)}")
    return result