Adafruit_CircuitPython_SEN6x/adafruit_sen6x.py
2025-06-21 12:50:57 +02:00

1124 lines
41 KiB
Python

# SPDX-FileCopyrightText: Copyright (c) 2025 Liz Clark for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
`adafruit_sen6x`
================================================================================
CircuitPython driver for the Sensirion SEN6x environmental sensor node
* Author(s): Liz Clark
Implementation Notes
--------------------
**Hardware:**
* `Link Text <https://www.adafruit.com/product/6331>`_"
**Software and Dependencies:**
* Adafruit CircuitPython firmware for the supported boards:
https://circuitpython.org/downloads
* Adafruit's Bus Device library: https://github.com/adafruit/Adafruit_CircuitPython_BusDevice
"""
import struct
import time
from adafruit_bus_device.i2c_device import I2CDevice
from micropython import const
try:
from typing import Any, BinaryIO, Dict, List, Optional, Tuple, Union
from busio import I2C
except ImportError:
pass
__version__ = "0.0.0+auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_SEN6x.git"
# I2C addresses for SEN6x series
SEN6X_I2C_ADDRESS = const(0x6B) # SEN63C, SEN65, SEN66, SEN68
SEN60_I2C_ADDRESS = const(0x6C) # SEN60 only
# Common commands shared across all SEN6x variants
# Command ID format: 16-bit with built-in 3-bit CRC
_START_MEASUREMENT = const(0x0021)
_STOP_MEASUREMENT = const(0x0104)
_DATA_READY = const(0x0202)
_RESET = const(0xD304)
_SERIAL_NUMBER = const(0xD033)
_VERSION = const(0xD100)
_PRODUCT_NAME = const(0xD014)
_DEVICE_STATUS = const(0xD206)
_CLEAR_DEVICE_STATUS = const(0xD210)
_FAN_CLEANING = const(0x5607)
_VOC_STATE = const(0x6181)
_VOC_TUNING = const(0x60D0)
# CO2 commands (SEN66 specific)
_FORCE_CO2_RECALIBRATION = const(0x6707)
_CO2_AUTO_CALIB = const(0x6711)
_AMBIENT_PRESSURE = const(0x6720)
_SENSOR_ALTITUDE = const(0x6736)
_SHT_HEATER = const(0x6790)
# NOx algorithm commands
_NOX_TUNING = const(0x60E1)
# Temperature configuration commands
_TEMP_OFFSET = const(0x60B2)
_TEMP_ACCELERATION = const(0x6100)
# SEN66-specific commands
_SEN66_READ_MEASUREMENT = const(0x0300) # Read all measurements
_SEN66_READ_RAW_VALUES = const(0x0405) # Read raw values
_READ_NUMBER_CONCENTRATION = const(0x0316)
# Command execution times (in seconds)
_TIME_START_MEASUREMENT = const(0.050) # 50ms
_TIME_STOP_MEASUREMENT = const(1.000) # 1000ms
_TIME_DATA_READY = const(0.020) # 20ms
_TIME_READ_MEASUREMENT = const(0.020) # 20ms
_TIME_STANDARD = const(0.020) # 20ms for most commands
_TIME_MINIMAL = const(0.001) # 1ms minimum wait
_TIME_RESET = const(1.200) # 1200ms for reset
_TIME_SHT_HEATER = const(1.300) # 1300ms for SHT heater
_TIME_CO2_RECALIBRATION = const(0.500) # 500ms for CO2 recalibration
# Sensor startup time (maximum)
_SENSOR_STARTUP_TIME = const(1.0) # 1 second max startup time
# Measurement timing
_FIRST_MEASUREMENT_DELAY = const(1.1) # 1.1s until first measurement ready
_NOX_STARTUP_TIME = const(11.0) # 10-11s for NOx sensor initialization
_CO2_STARTUP_TIME = const(6.0) # 5-6s for CO2 sensor initialization
# Data value indicators
_UNKNOWN_VALUE = const(0xFFFF) # Unknown value for unsigned 16-bit
# Status register bit positions (for the 32-bit status register)
# Warning bits (upper 16 bits)
_STATUS_SPEED_WARNING = const(21)
# Error bits (lower 16 bits)
_STATUS_CO2_1_ERROR = const(12)
_STATUS_PM_ERROR = const(11)
_STATUS_HCHO_ERROR = const(10)
_STATUS_CO2_2_ERROR = const(9)
_STATUS_GAS_ERROR = const(7)
_STATUS_RHT_ERROR = const(6)
_STATUS_FAN_ERROR = const(4)
class DeviceStatus:
"""Helper class to parse the device status register"""
def __init__(self, status_data: int) -> None:
"""
Args:
status_data: 32-bit status register value
"""
self._status: int = status_data
@property
def speed_warning(self) -> bool:
"""Fan speed out of range warning"""
return bool(self._status & (1 << _STATUS_SPEED_WARNING))
@property
def co2_sensor_1_error(self) -> bool:
"""CO2 sensor 1 error (SEN68 only)
CO2 values might be unknown or wrong if this flag is set.
This is a sticky error that persists until cleared.
"""
return bool(self._status & (1 << _STATUS_CO2_1_ERROR))
@property
def pm_sensor_error(self) -> bool:
"""Particulate matter sensor error (SEN63C, SEN65, SEN66, SEN68)
PM values might be unknown or wrong if this flag is set.
RH and temperature values might be out of spec due to compensation algorithms.
This is a sticky error that persists until cleared.
"""
return bool(self._status & (1 << _STATUS_PM_ERROR))
@property
def hcho_sensor_error(self) -> bool:
"""Formaldehyde sensor error (SEN68 only)
HCHO values might be unknown or wrong if this flag is set.
This is a sticky error that persists until cleared.
"""
return bool(self._status & (1 << _STATUS_HCHO_ERROR))
@property
def co2_sensor_2_error(self) -> bool:
"""CO2 sensor 2 error (SEN66 only)
CO2 values might be unknown or wrong if this flag is set.
RH and temperature values might be out of spec due to compensation algorithms.
This is a sticky error that persists until cleared.
"""
return bool(self._status & (1 << _STATUS_CO2_2_ERROR))
@property
def gas_sensor_error(self) -> bool:
"""VOC/NOx gas sensor error (SEN65, SEN66, SEN68)
VOC index and NOx index might be unknown or wrong if this flag is set.
RH and temperature values might be out of spec due to compensation algorithms.
This is a sticky error that persists until cleared.
"""
return bool(self._status & (1 << _STATUS_GAS_ERROR))
@property
def rht_sensor_error(self) -> bool:
"""Relative humidity and temperature sensor error (SEN63C, SEN65, SEN66, SEN68)
Temperature and humidity values might be unknown or wrong if this flag is set.
Other measured values might be out of spec due to compensation algorithms.
This is a sticky error that persists until cleared.
"""
return bool(self._status & (1 << _STATUS_RHT_ERROR))
@property
def fan_error(self) -> bool:
"""Fan error - fan is mechanically blocked or broken
Fan is switched on but 0 RPM measured for multiple consecutive intervals.
All measured values are likely wrong if this error is reported.
This is a sticky error that persists until cleared.
"""
return bool(self._status & (1 << _STATUS_FAN_ERROR))
@property
def errors(self) -> bool:
"""Check if any error bits are set"""
error_mask = (
(1 << _STATUS_CO2_1_ERROR)
| (1 << _STATUS_PM_ERROR)
| (1 << _STATUS_HCHO_ERROR)
| (1 << _STATUS_CO2_2_ERROR)
| (1 << _STATUS_GAS_ERROR)
| (1 << _STATUS_RHT_ERROR)
| (1 << _STATUS_FAN_ERROR)
)
return bool(self._status & error_mask)
@property
def warnings(self) -> bool:
"""Check if any warning bits are set"""
warning_mask = 1 << _STATUS_SPEED_WARNING
return bool(self._status & warning_mask)
def __str__(self) -> str:
"""String representation of status"""
status_items: List[str] = []
if self.speed_warning:
status_items.append("Speed Warning")
if self.co2_sensor_1_error:
status_items.append("CO2-1 Error")
if self.pm_sensor_error:
status_items.append("PM Error")
if self.hcho_sensor_error:
status_items.append("HCHO Error")
if self.co2_sensor_2_error:
status_items.append("CO2-2 Error")
if self.gas_sensor_error:
status_items.append("Gas Error")
if self.rht_sensor_error:
status_items.append("RH&T Error")
if self.fan_error:
status_items.append("Fan Error")
if not status_items:
return "Status: OK"
return "Status: " + ", ".join(status_items)
class SEN6x:
"""Base class for Sensirion SEN6x environmental sensors"""
def __init__(self, i2c: I2C, address: int = SEN6X_I2C_ADDRESS) -> None:
self.i2c_device: I2CDevice = I2CDevice(i2c, address)
self._serial_number: Optional[str] = None
self._product_name: Optional[str] = None
self._measurement_started: bool = False
# Allow sensor to complete startup
time.sleep(_SENSOR_STARTUP_TIME)
def _write_command(
self, command: int, data: Optional[List[int]] = None, execution_time: float = _TIME_STANDARD
) -> None:
"""Write a command to the sensor with optional data
Args:
command: 16-bit command ID (already contains 3-bit CRC)
data: Optional list of 16-bit values to write
execution_time: Time to wait after command (seconds)
"""
buffer = struct.pack(">H", command)
with self.i2c_device as i2c:
# Write command (MSB first)
if data is None:
i2c.write(buffer)
else:
for value in data:
# Pack 16-bit value
value_bytes = struct.pack(">H", value)
# Calculate and append CRC
crc = self._crc8(value_bytes)
buffer += value_bytes + bytes([crc])
i2c.write(buffer)
# Wait for command execution
time.sleep(execution_time)
def _read_data(self, num_words: int, execution_time: float = _TIME_STANDARD) -> List[int]:
"""Data from sensor after a read command
Each word is 2 bytes + 1 CRC byte = 3 bytes per word
Args:
num_words: Number of 16-bit words to read
execution_time: Time to wait before reading (seconds)
Returns:
List of 16-bit values
"""
# Wait for command execution before reading
time.sleep(execution_time)
buffer = bytearray(num_words * 3)
with self.i2c_device as i2c:
i2c.readinto(buffer)
# Process data and check CRC
data: List[int] = []
for i in range(num_words):
word_start = i * 3
word_data = buffer[word_start : word_start + 2]
crc = buffer[word_start + 2]
# Check CRC
if self._crc8(word_data) != crc:
raise RuntimeError(f"CRC check failed for word {i}")
data.append(struct.unpack(">H", word_data)[0])
return data
@staticmethod
def _crc8(data: bytes) -> int:
"""Calculate CRC8 for Sensirion sensors
Polynomial: 0x31 (x^8 + x^5 + x^4 + 1)
Initialization: 0xFF
"""
crc = 0xFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x80:
crc = (crc << 1) ^ 0x31
else:
crc = crc << 1
crc &= 0xFF
return crc
def reset(self) -> None:
"""Reset the sensor
After reset, the sensor needs time to start up before accepting commands.
All configuration parameters are reset to default values.
"""
self._write_command(_RESET, execution_time=_TIME_RESET)
self._measurement_started = False
# Clear cached values
self._serial_number = None
self._product_name = None
# Wait for sensor to restart
time.sleep(_SENSOR_STARTUP_TIME)
def start_measurement(self) -> None:
"""Start continuous measurement mode
Once started, the sensor will continuously update its readings.
Use data_ready property to check when new data is available.
"""
if self._measurement_started:
return
self._write_command(_START_MEASUREMENT, execution_time=_TIME_START_MEASUREMENT)
self._measurement_started = True
def stop_measurement(self) -> None:
"""Stop continuous measurement mode
Note: This command takes up to 1 second to execute.
"""
if not self._measurement_started:
return
self._write_command(_STOP_MEASUREMENT, execution_time=_TIME_STOP_MEASUREMENT)
self._measurement_started = False
@property
def data_ready(self) -> bool:
"""Check if new measurement data is ready
Returns:
bool: True if new data is available
"""
if not self._measurement_started:
return False
self._write_command(_DATA_READY)
data = self._read_data(1, execution_time=_TIME_DATA_READY)
# Last bit indicates data ready status
return bool(data[0] & 0x0001)
@property
def serial_number(self) -> str:
"""The sensor serial number as ASCII string (up to 32 characters)"""
if self._serial_number is None:
self._write_command(_SERIAL_NUMBER)
# Serial number is string<32> (16 words max)
data = self._read_data(16, execution_time=_TIME_STANDARD)
# Convert to string, removing null termination
serial_bytes = b""
for word in data:
serial_bytes += struct.pack(">H", word)
self._serial_number = serial_bytes.decode("utf-8").rstrip("\x00")
return self._serial_number
@property
def product_name(self) -> str:
"""The product name (32 bytes)"""
if self._product_name is None:
self._write_command(_PRODUCT_NAME)
# Product name is 32 bytes (16 words)
data = self._read_data(16, execution_time=_TIME_STANDARD)
# Convert to string, removing null termination
name_bytes = b""
for word in data:
name_bytes += struct.pack(">H", word)
self._product_name = name_bytes.decode("utf-8").rstrip("\x00")
return self._product_name
@property
def device_status(self) -> DeviceStatus:
"""The device status register
Returns:
DeviceStatus: Object containing parsed status information
"""
self._write_command(_DEVICE_STATUS)
# Status register is 32 bits (2 words)
data = self._read_data(2, execution_time=_TIME_STANDARD)
# Combine into 32-bit value (MSB first)
status_value = (data[0] << 16) | data[1]
return DeviceStatus(status_value)
def clear_device_status(self) -> None:
"""Clear the device status register
This clears all error and warning flags. Note that if the error
condition persists, the flags will be set again. All error flags
are "sticky" - they remain set even if the error condition goes
away, until explicitly cleared by this command or a device reset.
"""
self._write_command(_CLEAR_DEVICE_STATUS, execution_time=_TIME_STANDARD)
def start_fan_cleaning(self) -> None:
"""Start the fan cleaning procedure
This accelerates the fan to maximum speed for 10 seconds to blow out
dust that has accumulated in the fan/sensor housing. This command can
only be executed when the sensor is in idle mode (not measuring).
After fan cleaning, wait at least 10 seconds before starting measurement.
Raises:
RuntimeError: If sensor is currently measuring
"""
if self._measurement_started:
raise RuntimeError(
"Cannot start fan cleaning while measuring. Call stop_measurement() first."
)
self._write_command(_FAN_CLEANING, execution_time=_TIME_STANDARD)
@property
def version(self) -> Tuple[int, int]:
"""Firmware version information
Returns:
tuple: (major_version, minor_version)
"""
self._write_command(_VERSION) # version command
data = self._read_data(1, execution_time=_TIME_STANDARD)
# Version is packed as two bytes in one word
major = (data[0] >> 8) & 0xFF
minor = data[0] & 0xFF
return (major, minor)
@property
def sht_heater_measurements(self) -> Dict[str, Optional[float]]:
"""Measurements when SHT heater is active (firmware >= 4.0)
Returns heating progress measurements. If heating not finished,
returns None values.
Returns:
dict: {'humidity': value or None, 'temperature': value or None}
"""
self._write_command(_SHT_HEATER)
data = self._read_data(2, execution_time=_TIME_STANDARD)
temp_scale = 200.0
humidity_scale = 100.0
return {
"humidity": None if data[0] == _UNKNOWN_VALUE else data[0] / humidity_scale,
"temperature": None if data[1] == _UNKNOWN_VALUE else data[1] / temp_scale,
}
def check_sensor_errors(self) -> None:
"""Check device status and raise exception if critical errors present
This is a convenience method that checks for sensor errors that would
make measurements unreliable. It's recommended to call this before
reading measurements.
Raises:
RuntimeError: If critical sensor errors are detected
"""
status = self.device_status
if status.fan_error:
raise RuntimeError("Fan error detected - measurements unreliable")
errors: List[str] = []
if status.pm_sensor_error:
errors.append("PM sensor")
if status.gas_sensor_error:
errors.append("Gas sensor")
if status.rht_sensor_error:
errors.append("RH&T sensor")
if errors:
raise RuntimeError(f"Sensor errors detected: {', '.join(errors)}")
@property
def error_status_description(self) -> Dict[str, str]:
"""Human-readable description of current errors and their effects
Returns:
dict: Dictionary with error names and their implications
"""
status = self.device_status
errors: Dict[str, str] = {}
if status.fan_error:
errors["fan"] = "Fan blocked/broken - ALL measurements unreliable"
if status.pm_sensor_error:
errors["pm"] = "PM values unreliable, RH&T may be affected"
if status.gas_sensor_error:
errors["gas"] = "VOC/NOx indices unreliable, RH&T may be affected"
if status.rht_sensor_error:
errors["rht"] = "Temperature/humidity unreliable, other values may be affected"
if status.co2_sensor_2_error:
errors["co2"] = "CO2 values unreliable, RH&T may be affected"
return errors
class SEN66(SEN6x): # noqa: PLR0904
"""Driver for SEN66 sensor - measures PM, VOC, NOx, CO2, RH, and Temperature"""
def __init__(self, i2c: I2C, address: int = SEN6X_I2C_ADDRESS) -> None:
super().__init__(i2c, address)
self._measurement_data: Optional[Dict[str, Optional[float]]] = None
self._measurement_time: Optional[float] = None
def all_measurements(self) -> Dict[str, Optional[float]]:
"""All measurement values from SEN66
Must be called when sensor is in measurement mode and data is ready.
Note: CO2 values will be 0xFFFF for first 5-6 seconds after measurement start.
Note: NOx values will be 0x7FFF for first 10-11 seconds after power-on/reset.
Returns:
dict: Dictionary containing:
- pm1_0: PM1.0 concentration (µg/m³) or None if unknown
- pm2_5: PM2.5 concentration (µg/m³) or None if unknown
- pm4_0: PM4.0 concentration (µg/m³) or None if unknown
- pm10: PM10 concentration (µg/m³) or None if unknown
- humidity: Relative humidity (%) or None if unknown
- temperature: Temperature (°C) or None if unknown
- voc_index: VOC index (0.1-50.0) or None if unknown
- nox_index: NOx index (0.1-50.0) or None if unknown
- co2: CO2 concentration (ppm) or None if unknown
Raises:
RuntimeError: If sensor is not in measurement mode
"""
if not self._measurement_started:
raise RuntimeError(
"Sensor must be in measurement mode. Call start_measurement() first."
)
self._write_command(_SEN66_READ_MEASUREMENT)
# SEN66 returns 9 values (9 words) - includes CO2
data = self._read_data(9, execution_time=_TIME_READ_MEASUREMENT)
# Scale factors from datasheet
pm_scale = 10.0
temp_scale = 200.0
humidity_scale = 100.0
voc_nox_scale = 10.0
# Track measurement time for startup detection
if self._measurement_time is None:
self._measurement_time = time.monotonic()
# Process PM values (uint16, 0xFFFF = unknown)
pm1_0: Optional[float] = None if data[0] == _UNKNOWN_VALUE else data[0] / pm_scale
pm2_5: Optional[float] = None if data[1] == _UNKNOWN_VALUE else data[1] / pm_scale
pm4_0: Optional[float] = None if data[2] == _UNKNOWN_VALUE else data[2] / pm_scale
pm10: Optional[float] = None if data[3] == _UNKNOWN_VALUE else data[3] / pm_scale
# Process RH&T values (int16, 0x7FFF = unknown)
humidity: Optional[float] = None if data[4] == _UNKNOWN_VALUE else data[4] / humidity_scale
temperature: Optional[float] = None if data[5] == _UNKNOWN_VALUE else data[5] / temp_scale
# Process VOC/NOx indices (int16, 0x7FFF = unknown)
voc_index: Optional[float] = None if data[6] == _UNKNOWN_VALUE else data[6] / voc_nox_scale
nox_index: Optional[float] = None if data[7] == _UNKNOWN_VALUE else data[7] / voc_nox_scale
# Process CO2 (uint16, 0xFFFF = unknown)
co2: Optional[float] = None if data[8] == _UNKNOWN_VALUE else float(data[8])
self._measurement_data = {
"pm1_0": pm1_0,
"pm2_5": pm2_5,
"pm4_0": pm4_0,
"pm10": pm10,
"humidity": humidity,
"temperature": temperature,
"voc_index": voc_index,
"nox_index": nox_index,
"co2": co2,
}
return self._measurement_data
def _check_measurements(self) -> None:
"""Ensure measurements have been read"""
if self._measurement_data is None:
self._measurement_data = self.all_measurements()
def raw_values(self) -> Dict[str, Optional[float]]:
"""Raw sensor values from SEN66
Returns raw sensor readings without index calculations.
Returns:
dict: Dictionary containing:
- raw_humidity: Raw humidity (%)
- raw_temperature: Raw temperature (°C)
- raw_voc: Raw VOC ticks (no scale)
- raw_nox: Raw NOx ticks (no scale)
- raw_co2: Raw CO2 concentration (ppm, updated every 5s)
"""
if not self._measurement_started:
raise RuntimeError(
"Sensor must be in measurement mode. Call start_measurement() first."
)
self._write_command(_SEN66_READ_RAW_VALUES)
data = self._read_data(5, execution_time=_TIME_READ_MEASUREMENT)
temp_scale = 200.0
humidity_scale = 100.0
return {
"raw_humidity": None if data[0] == _UNKNOWN_VALUE else data[0] / humidity_scale,
"raw_temperature": None if data[1] == _UNKNOWN_VALUE else data[1] / temp_scale,
"raw_voc": None if data[2] == _UNKNOWN_VALUE else float(data[2]),
"raw_nox": None if data[3] == _UNKNOWN_VALUE else float(data[3]),
"raw_co2": None if data[4] == _UNKNOWN_VALUE else float(data[4]),
}
def number_concentration(self) -> Dict[str, Optional[float]]:
"""Particle number concentration values
Returns:
dict: Dictionary containing number concentrations (particles/cm³):
- nc_pm0_5: PM0.5 number concentration
- nc_pm1_0: PM1.0 number concentration
- nc_pm2_5: PM2.5 number concentration
- nc_pm4_0: PM4.0 number concentration
- nc_pm10: PM10 number concentration
"""
if not self._measurement_started:
raise RuntimeError(
"Sensor must be in measurement mode. Call start_measurement() first."
)
self._write_command(_READ_NUMBER_CONCENTRATION)
data = self._read_data(5, execution_time=_TIME_READ_MEASUREMENT)
nc_scale = 10.0
return {
"nc_pm0_5": None if data[0] == _UNKNOWN_VALUE else data[0] / nc_scale,
"nc_pm1_0": None if data[1] == _UNKNOWN_VALUE else data[1] / nc_scale,
"nc_pm2_5": None if data[2] == _UNKNOWN_VALUE else data[2] / nc_scale,
"nc_pm4_0": None if data[3] == _UNKNOWN_VALUE else data[3] / nc_scale,
"nc_pm10": None if data[4] == _UNKNOWN_VALUE else data[4] / nc_scale,
}
def temperature_offset(
self, offset: float = 0.0, slope: float = 0.0, time_constant: int = 0, slot: int = 0
) -> None:
"""Temperature offset parameters for design-in compensation
Compensated temperature = ambient_temp + (slope * ambient_temp) + offset
Args:
offset: Constant temperature offset in °C (default: 0.0)
slope: Temperature dependent offset factor (default: 0.0)
time_constant: Time constant in seconds for applying changes (default: 0 = immediate)
slot: Offset slot to modify (0-4, default: 0)
Note: Configuration is volatile and reset to defaults after power cycle
"""
if not 0 <= slot <= 4:
raise ValueError("Slot must be 0-4")
# Scale factors - these are signed values
offset_scaled = int(offset * 200)
slope_scaled = int(slope * 10000)
# Convert signed to unsigned for I2C transmission
offset_scaled = offset_scaled & 0xFFFF
slope_scaled = slope_scaled & 0xFFFF
data = [offset_scaled, slope_scaled, time_constant, slot]
self._write_command(_TEMP_OFFSET, data=data, execution_time=_TIME_STANDARD)
def temperature_acceleration(
self, k: float = 10.0, p: float = 10.0, t1: float = 10.0, t2: float = 10.0
) -> None:
"""Temperature acceleration parameters for RH/T engine
Overwrites default temperature acceleration parameters.
Args:
k: Filter constant K (default: 10.0, actual = value/10)
p: Filter constant P (default: 10.0, actual = value/10)
t1: Time constant T1 in seconds (default: 10.0, actual = value/10)
t2: Time constant T2 in seconds (default: 10.0, actual = value/10)
Note: Configuration is volatile and reset to defaults after power cycle.
Must be called in idle mode.
"""
if self._measurement_started:
raise RuntimeError("Cannot set temperature acceleration while measuring.")
# Scale factors (multiply by 10 for protocol)
k_scaled = int(k * 10)
p_scaled = int(p * 10)
t1_scaled = int(t1 * 10)
t2_scaled = int(t2 * 10)
data = [k_scaled, p_scaled, t1_scaled, t2_scaled]
self._write_command(_TEMP_ACCELERATION, data=data, execution_time=_TIME_STANDARD)
def force_co2_recalibration(self, target_co2_ppm: int) -> Optional[int]:
"""Perform forced CO2 recalibration (FRC)
Forces the CO2 sensor to recalibrate to a known reference concentration.
This should be done when the sensor is in a controlled environment with
a known CO2 concentration (e.g., fresh outdoor air at ~420 ppm).
Args:
target_co2_ppm: Known CO2 concentration in ppm at current location
Returns:
int: CO2 correction applied in ppm, or None if recalibration failed
Raises:
RuntimeError: If sensor is currently measuring
Note: Sensor must be in idle mode. Wait at least 1000ms after power-on
or 600ms after stop_measurement() before calling this.
"""
if self._measurement_started:
raise RuntimeError(
"Cannot recalibrate CO2 while measuring. Call stop_measurement() first."
)
# Send target CO2 concentration
self._write_command(
_FORCE_CO2_RECALIBRATION, data=[target_co2_ppm], execution_time=_TIME_CO2_RECALIBRATION
)
# Read correction value
data = self._read_data(1, execution_time=0) # No additional wait, already waited 500ms
# Check if recalibration failed
if data[0] == 0xFFFF:
return None
# Calculate actual correction: correction = return_value - 0x8000
correction = data[0] - 0x8000
return correction
@property
def co2_automatic_self_calibration(self) -> bool:
"""CO2 sensor automatic self-calibration (ASC) status
Returns:
bool: True if ASC is enabled, False if disabled
"""
if self._measurement_started:
raise RuntimeError(
"Cannot read CO2 ASC while measuring. Call stop_measurement() first."
)
self._write_command(_CO2_AUTO_CALIB)
data = self._read_data(1, execution_time=_TIME_STANDARD)
# Data format: [padding_byte, status_byte] packed in one word
# Extract status byte (LSB)
return bool(data[0] & 0xFF)
@co2_automatic_self_calibration.setter
def co2_automatic_self_calibration(self, enabled: bool) -> None:
"""CO2 sensor automatic self-calibration (ASC) status
ASC assumes the sensor is exposed to fresh air (~400 ppm) at least
once every few days. Enable for office/home use, disable for
greenhouses or continuously occupied spaces.
Args:
enabled: True to enable ASC, False to disable
Note: Default is enabled. Setting is volatile (reset on power cycle).
"""
if self._measurement_started:
raise RuntimeError("Cannot set CO2 ASC while measuring. Call stop_measurement() first.")
# Pack padding byte (0x00) and status byte into one word
status_word = 0x0001 if enabled else 0x0000
self._write_command(_CO2_AUTO_CALIB, data=[status_word], execution_time=_TIME_STANDARD)
@property
def ambient_pressure(self) -> int:
"""Ambient pressure used for CO2 compensation
Returns:
int: Current ambient pressure in hPa (hectopascals)
"""
self._write_command(_AMBIENT_PRESSURE)
data = self._read_data(1, execution_time=_TIME_STANDARD)
return data[0]
@ambient_pressure.setter
def ambient_pressure(self, pressure_hpa: int) -> None:
"""Ambient pressure for CO2 compensation
Use this for applications with significant pressure changes.
Setting pressure overrides any altitude-based compensation.
Args:
pressure_hpa: Ambient pressure in hPa (700-1200, default: 1013)
Raises:
ValueError: If pressure is outside valid range
Note: Setting is volatile (reset to 1013 hPa on power cycle).
"""
if not 700 <= pressure_hpa <= 1200:
raise ValueError("Ambient pressure must be 700-1200 hPa")
self._write_command(_AMBIENT_PRESSURE, data=[pressure_hpa], execution_time=_TIME_STANDARD)
@property
def sensor_altitude(self) -> int:
"""Sensor altitude used for CO2 compensation
Returns:
int: Current sensor altitude in meters above sea level
"""
if self._measurement_started:
raise RuntimeError(
"Cannot read altitude while measuring. Call stop_measurement() first."
)
self._write_command(_SENSOR_ALTITUDE)
data = self._read_data(1, execution_time=_TIME_STANDARD)
return data[0]
@sensor_altitude.setter
def sensor_altitude(self, altitude_m: int) -> None:
"""Sensor altitude for CO2 compensation
Alternative to setting ambient pressure directly.
The sensor will calculate pressure based on altitude.
Args:
altitude_m: Altitude in meters (0-3000, default: 0)
Raises:
ValueError: If altitude is outside valid range
RuntimeError: If sensor is currently measuring
Note: Setting is volatile (reset to 0m on power cycle).
"""
if self._measurement_started:
raise RuntimeError(
"Cannot set altitude while measuring. Call stop_measurement() first."
)
if not 0 <= altitude_m <= 3000:
raise ValueError("Altitude must be 0-3000 meters")
self._write_command(_SENSOR_ALTITUDE, data=[altitude_m], execution_time=_TIME_STANDARD)
@property
def voc_algorithm_state(self) -> bytes:
"""VOC algorithm state for backup/restore
Can be called in either idle or measurement mode. In measurement mode,
returns the current state. In idle mode, returns the state from when
measurement was stopped.
Returns:
bytes: 8-byte algorithm state that can be restored later
"""
self._write_command(_VOC_STATE)
data = self._read_data(4, execution_time=_TIME_STANDARD) # 4 words = 8 bytes
# Convert words to bytes
state = b""
for word in data:
state += struct.pack(">H", word)
return state
@voc_algorithm_state.setter
def voc_algorithm_state(self, state: bytes) -> None:
"""Restore VOC algorithm state from backup
Allows skipping the initial VOC learning phase after power cycle.
Must be called in idle mode before starting measurement.
Args:
state: 8-byte algorithm state from get_voc_algorithm_state()
Note: Only works in idle mode, applied when measurement starts
"""
if self._measurement_started:
raise RuntimeError(
"Cannot set VOC state while measuring. Call stop_measurement() first."
)
if len(state) != 8:
raise ValueError("State must be exactly 8 bytes")
# Convert bytes to words
data: List[int] = []
for i in range(0, 8, 2):
data.append(struct.unpack(">H", state[i : i + 2])[0])
self._write_command(_VOC_STATE, data=data, execution_time=_TIME_STANDARD)
@property
def voc_algorithm(self) -> Dict[str, int]:
"""VOC algorithm tuning parameters
Returns:
dict: Current VOC algorithm parameters
"""
if self._measurement_started:
raise RuntimeError(
"Cannot read VOC tuning while measuring. Call stop_measurement() first."
)
self._write_command(_VOC_TUNING)
data = self._read_data(6, execution_time=_TIME_STANDARD)
return {
"index_offset": data[0],
"learning_time_offset_hours": data[1],
"learning_time_gain_hours": data[2],
"gating_max_duration_minutes": data[3],
"std_initial": data[4],
"gain_factor": data[5],
}
def voc_algorithm_tuning( # noqa: PLR0913 PLR0917
self,
index_offset: int = 100,
learning_time_offset_hours: int = 12,
learning_time_gain_hours: int = 12,
gating_max_duration_minutes: int = 180,
std_initial: int = 50,
gain_factor: int = 230,
) -> None:
"""VOC algorithm tuning parameters
Args:
index_offset: VOC index for average conditions (1-250, default: 100)
learning_time_offset_hours: Time constant for offset learning (1-1000, default: 12)
learning_time_gain_hours: Time constant for gain learning (1-1000, default: 12)
gating_max_duration_minutes: Max gating duration (0-3000, default: 180, 0=disabled)
std_initial: Initial standard deviation (10-5000, default: 50)
gain_factor: Output gain factor (1-1000, default: 230)
Note: Configuration is volatile and reset to defaults after power cycle
"""
if self._measurement_started:
raise RuntimeError(
"Cannot set VOC tuning while measuring. Call stop_measurement() first."
)
# Validate ranges
if not 1 <= index_offset <= 250:
raise ValueError("index_offset must be 1-250")
if not 1 <= learning_time_offset_hours <= 1000:
raise ValueError("learning_time_offset_hours must be 1-1000")
if not 1 <= learning_time_gain_hours <= 1000:
raise ValueError("learning_time_gain_hours must be 1-1000")
if not 0 <= gating_max_duration_minutes <= 3000:
raise ValueError("gating_max_duration_minutes must be 0-3000")
if not 10 <= std_initial <= 5000:
raise ValueError("std_initial must be 10-5000")
if not 1 <= gain_factor <= 1000:
raise ValueError("gain_factor must be 1-1000")
data = [
index_offset,
learning_time_offset_hours,
learning_time_gain_hours,
gating_max_duration_minutes,
std_initial,
gain_factor,
]
self._write_command(_VOC_TUNING, data=data, execution_time=_TIME_STANDARD)
@property
def nox_algorithm(self) -> Dict[str, int]:
"""NOx algorithm tuning parameters
Returns:
dict: Current NOx algorithm parameters
"""
if self._measurement_started:
raise RuntimeError(
"Cannot read NOx tuning while measuring. Call stop_measurement() first."
)
self._write_command(_NOX_TUNING)
data = self._read_data(6, execution_time=_TIME_STANDARD)
return {
"index_offset": data[0],
"learning_time_offset_hours": data[1],
"learning_time_gain_hours": data[2], # No effect for NOx
"gating_max_duration_minutes": data[3],
"std_initial": data[4], # No effect for NOx
"gain_factor": data[5],
}
def nox_algorithm_tuning(
self,
index_offset: int = 1,
learning_time_offset_hours: int = 12,
gating_max_duration_minutes: int = 720,
gain_factor: int = 230,
) -> None:
"""NOx algorithm tuning parameters
Args:
index_offset: NOx index for average conditions (1-250, default: 1)
learning_time_offset_hours: Time constant for offset learning (1-1000, default: 12)
gating_max_duration_minutes: Max gating duration (0-3000, default: 720, 0=disabled)
gain_factor: Output gain factor (1-1000, default: 230)
Note: learning_time_gain_hours is fixed at 12, std_initial is fixed at 50 for NOx.
Configuration is volatile and reset to defaults after power cycle.
"""
if self._measurement_started:
raise RuntimeError(
"Cannot set NOx tuning while measuring. Call stop_measurement() first."
)
# Validate ranges
if not 1 <= index_offset <= 250:
raise ValueError("index_offset must be 1-250")
if not 1 <= learning_time_offset_hours <= 1000:
raise ValueError("learning_time_offset_hours must be 1-1000")
if not 0 <= gating_max_duration_minutes <= 3000:
raise ValueError("gating_max_duration_minutes must be 0-3000")
if not 1 <= gain_factor <= 1000:
raise ValueError("gain_factor must be 1-1000")
# Fixed parameters for NOx
learning_time_gain_hours = 12 # Must be 12 for NOx
std_initial = 50 # Must be 50 for NOx
data = [
index_offset,
learning_time_offset_hours,
learning_time_gain_hours,
gating_max_duration_minutes,
std_initial,
gain_factor,
]
self._write_command(_NOX_TUNING, data=data, execution_time=_TIME_STANDARD)
@property
def temperature(self) -> Optional[float]:
"""Temperature in Celsius"""
self.all_measurements()
return self._measurement_data["temperature"] if self._measurement_data else None
@property
def humidity(self) -> Optional[float]:
"""Relative humidity in percent"""
self.all_measurements()
return self._measurement_data["humidity"] if self._measurement_data else None
@property
def pm2_5(self) -> Optional[float]:
"""PM2.5 concentration in µg/m³"""
self.all_measurements()
return self._measurement_data["pm2_5"] if self._measurement_data else None
@property
def voc_index(self) -> Optional[float]:
"""VOC index (0.1-50.0)"""
self.all_measurements()
return self._measurement_data["voc_index"] if self._measurement_data else None
@property
def nox_index(self) -> Optional[float]:
"""NOx index (0.1-50.0)"""
self.all_measurements()
return self._measurement_data["nox_index"] if self._measurement_data else None
@property
def co2(self) -> Optional[float]:
"""CO2 concentration in ppm"""
self.all_measurements()
return self._measurement_data["co2"] if self._measurement_data else None