268 lines
6.7 KiB
Python
268 lines
6.7 KiB
Python
"""Provides IPMI sensor related functions."""
|
|
from enum import auto
|
|
from enum import IntEnum
|
|
|
|
from .models import PowerSupplyFlag
|
|
from .models import Sensor
|
|
from .models import SensorStateEnum
|
|
from .models import SensorTypeEnum
|
|
from .models import SensorUnitEnum
|
|
from .util import hex_signed_int
|
|
from .util import signed_int
|
|
from .util import ten_bit_str
|
|
|
|
SENSOR_READING_SCALE = 1000
|
|
|
|
|
|
class LinearisationEnum(IntEnum):
|
|
"""Enumeration of linearisation formulas."""
|
|
|
|
LINEAR = 0
|
|
LN = auto()
|
|
LOG_10 = auto()
|
|
LOG_2 = auto()
|
|
EULER = auto()
|
|
EXP_10 = auto()
|
|
EXP_2 = auto()
|
|
ONE_DIV_X = auto()
|
|
SQR = auto()
|
|
CUBE = auto()
|
|
ONE_DIV_CUBE = auto()
|
|
|
|
|
|
def reading_conversion(data: str, m: str, b: str, rb: str) -> float:
|
|
"""Performs Sensor Reading Conversion Formula.
|
|
|
|
TODO: This is the pre-linearisation formula, so look
|
|
into the spec some more.
|
|
|
|
Extracted from IPMI 2.0 specification, section 36.3.
|
|
|
|
Args:
|
|
data: Data reading.
|
|
m: Multiplier value.
|
|
b: Offset value.
|
|
rb: RB Exponent value.
|
|
|
|
Returns:
|
|
float: Pre-linearisation reading.
|
|
"""
|
|
from math import pow
|
|
|
|
# Extracted from 43.1 - SDR Type 01h, bytes 25, 27, 30.
|
|
m_raw = ten_bit_str(m)
|
|
b_raw = ten_bit_str(b)
|
|
km_raw = int(rb, 16) >> 4
|
|
kb_raw = int(rb, 16) & 0x0F
|
|
|
|
m_data = signed_int(m_raw, 10)
|
|
b_data = signed_int(b_raw, 10)
|
|
km_data = signed_int(km_raw, 4)
|
|
kb_data = signed_int(kb_raw, 4)
|
|
|
|
sensor_data = (m_data * int(data, 16) + b_data * pow(10, kb_data)) * pow(
|
|
10, km_data
|
|
)
|
|
|
|
return float(sensor_data)
|
|
|
|
|
|
def is_threshold_sensor(er_type: str) -> bool:
|
|
"""Detects whether sensor is threshold or discrete.
|
|
|
|
Extracted from the raw Event Reading Type field.
|
|
|
|
Byte 14 - Event/Reading Type Code.
|
|
|
|
Referenced from chapter 41.
|
|
|
|
Args:
|
|
er_type: Event Reading Type in raw form.
|
|
|
|
Returns:
|
|
bool: True if is a threshold sensor.
|
|
"""
|
|
return int(er_type, 16) == 1
|
|
|
|
|
|
def is_analog_data_format(unit_type_1: str) -> bool:
|
|
"""Detects whether sensor reading is in analog data format.
|
|
|
|
Args:
|
|
unit_type_1: No clue.
|
|
|
|
Returns:
|
|
bool: True if analog data format.
|
|
"""
|
|
return (int(unit_type_1, 16) >> 6) == 2
|
|
|
|
|
|
def get_sensor_state(option: str) -> SensorStateEnum:
|
|
"""Extract sensor state from OPTION byte.
|
|
|
|
... hopefully! Actually, probably a bad guess.
|
|
|
|
Args:
|
|
option: Raw OPTION byte from IPMI response.
|
|
|
|
Returns:
|
|
SensorStateEnum: State of the sensor.
|
|
"""
|
|
if int(option, 16) & 0x40:
|
|
return SensorStateEnum.PRESENT
|
|
else:
|
|
return SensorStateEnum.NOT_PRESENT
|
|
|
|
|
|
def perform_linearisation(method: str, reading: str) -> int:
|
|
"""Perform linearisation based on a method and given reading.
|
|
|
|
Args:
|
|
method: Linearisation method specified by IPMI response.
|
|
reading: Reading obtained from IPMI response.
|
|
|
|
Returns:
|
|
int: Linearised sensor reading.
|
|
|
|
Raises:
|
|
NotImplementedError: Raised when linearisation methods have
|
|
not been implemented.
|
|
"""
|
|
i_method = int(method, 16)
|
|
|
|
if i_method == LinearisationEnum.LINEAR:
|
|
return int((reading * SENSOR_READING_SCALE)) / SENSOR_READING_SCALE
|
|
else:
|
|
raise NotImplementedError
|
|
|
|
|
|
def process_threshold_sensor(item: dict) -> Sensor:
|
|
"""Process a threshold sensor.
|
|
|
|
Args:
|
|
item: Dict representing a sensor, obtained from the IPMI response.
|
|
|
|
Returns:
|
|
Sensor: Fully populated sensor.
|
|
"""
|
|
# values
|
|
values = {}
|
|
values["reading"] = item["READING"][:2]
|
|
values["unr"] = item["UNR"]
|
|
values["uc"] = item["UC"]
|
|
values["unc"] = item["UNC"]
|
|
values["lnc"] = item["LNC"]
|
|
values["lc"] = item["LC"]
|
|
values["lnr"] = item["LNR"]
|
|
|
|
# linearisation variables
|
|
multiplier = item["M"]
|
|
offset = item["B"]
|
|
rb_exponent = item["RB"]
|
|
l_method = item["L"]
|
|
|
|
# analog data conversion
|
|
if is_analog_data_format(item["UNIT1"]):
|
|
for key, value in values.items():
|
|
values[key] = hex_signed_int(value)
|
|
|
|
# perform linearisation of readings
|
|
for key, value in values.items():
|
|
values[key] = perform_linearisation(
|
|
l_method, reading_conversion(value, multiplier, offset, rb_exponent)
|
|
)
|
|
|
|
# add a sensor and we've got a stew goin'!
|
|
sensor = Sensor()
|
|
sensor.name = item["NAME"]
|
|
sensor.type = SensorTypeEnum(int(item["STYPE"], 16))
|
|
sensor.unit = SensorUnitEnum(int(item["UNIT"], 16))
|
|
sensor.state = get_sensor_state(item["OPTION"])
|
|
sensor.reading = values["reading"]
|
|
sensor.unc = values["unc"]
|
|
sensor.uc = values["uc"]
|
|
sensor.unr = values["unr"]
|
|
sensor.lc = values["lc"]
|
|
sensor.lnc = values["lnc"]
|
|
sensor.lnr = values["lnr"]
|
|
|
|
return sensor
|
|
|
|
|
|
def process_discrete_sensor(item: dict) -> Sensor:
|
|
"""Process a discrete sensor.
|
|
|
|
TODO: finish this function.
|
|
|
|
Args:
|
|
item: Dict representing a sensor, obtained from the IPMI response.
|
|
|
|
Returns:
|
|
Sensor: Fully populated sensor.
|
|
|
|
Raises:
|
|
NotImplementedError: Raised when sensor type has not been implemented.
|
|
"""
|
|
type = int(item["STYPE"], 16)
|
|
reading = item["READING"]
|
|
# raw_reading = reading[:2]
|
|
# option = int(item["OPTION"], 16)
|
|
sensor_d = int(reading[2:4], 16)
|
|
# sensor_dmsb = int(reading[4:6], 16)
|
|
# print(f"R:{reading} RR:{raw_reading} OPT:{option}")
|
|
# print(f"SD:{sensor_d} S_DMSB:{sensor_dmsb}")
|
|
sensor_state = get_sensor_state(item["OPTION"])
|
|
|
|
# TODO: add edge-cases from utils.js
|
|
if sensor_state is not SensorStateEnum.NOT_PRESENT:
|
|
if type == SensorTypeEnum.POWER_SUPPLY:
|
|
psu = Sensor()
|
|
psu.name = item["NAME"]
|
|
psu.type = SensorTypeEnum(type)
|
|
psu.flags = PowerSupplyFlag(sensor_d)
|
|
psu.state = sensor_state
|
|
return psu
|
|
else:
|
|
raise NotImplementedError
|
|
|
|
# utils.js: ShowDiscStateAPI( Sensor_Type, sensor_d )
|
|
# servh_sensor: ProcDiscreteSensor(node,Idx)
|
|
|
|
|
|
def process_sensor_response(sensor_list: list) -> list:
|
|
"""Obtain all sensors.
|
|
|
|
Args:
|
|
sensor_list: List of sensors obtained from an XML response.
|
|
|
|
Returns:
|
|
list: Fully populated sensors.
|
|
"""
|
|
sensors = []
|
|
sensor_id = 0
|
|
for item in sensor_list:
|
|
sensor = process_sensor(item)
|
|
sensor.id = sensor_id
|
|
sensors.append(sensor)
|
|
|
|
sensor_id += 1
|
|
|
|
return sensors
|
|
|
|
|
|
def process_sensor(item: dict) -> Sensor:
|
|
"""Process a single sensor.
|
|
|
|
Args:
|
|
item: A single sensor obtained from an XML response.
|
|
|
|
Returns:
|
|
Sensor: Fully populated sensor.
|
|
"""
|
|
if is_threshold_sensor(item["ERTYPE"]):
|
|
sensor = process_threshold_sensor(item)
|
|
else:
|
|
sensor = process_discrete_sensor(item)
|
|
|
|
return sensor
|