Source code for joulescope.v1.stream_buffer

# Copyright 2018-2022 Jetperch LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Stream Buffer implementation for v1 backend.
"""

from .sample_buffer import SampleBuffer
from .stats import compute_stats
import numpy as np
import logging


# statistics format for numpy structured data type
# https://docs.scipy.org/doc/numpy/user/basics.rec.html
_STATS_FIELDS = 6  # current, voltage, power, current_range, current_lsb, voltage_lsb
NP_STATS_FORMAT = ['u8', 'f8', 'f8', 'f8', 'f8']
NP_STATS_NAMES = ['length', 'mean', 'variance', 'min', 'max']
STATS_FIELD_NAMES = ['current', 'voltage', 'power', 'current_range', 'current_lsb', 'voltage_lsb']
STATS_DTYPE = np.dtype({'names': NP_STATS_NAMES, 'formats': NP_STATS_FORMAT})
STATS_FIELD_COUNT = _STATS_FIELDS
FIELDS = {
    'current':       [(1, 0), 'A'],
    'voltage':       [(2, 0), 'V'],
    'power':         [(3, 0), 'W'],
    'current_range': [(4, 0), ''],
    'current_lsb':   [(5, 0), ''],
    'voltage_lsb':   [(5, 1), ''],
}

[docs] class StreamBuffer: """Efficient real-time Joulescope data buffering. :param duration: The total length of the buffering in seconds. """ def __init__(self, duration, frequency, device, output_frequency=None): # current, voltage, power, current_range, gpi0, gpi1 self._input_sampling_frequency = frequency if output_frequency is not None: self._sampling_frequency = output_frequency else: self._sampling_frequency = frequency self._buffer_duration = duration #: total buffer duration in seconds self._device = device self._log = logging.getLogger(__name__) self._length = 0 self.buffers = {} self._duration_max = 0 self._contiguous_duration_max = 0 self._sample_id_start = None self._callback = None self._update() def _update(self): self._length = int(self._buffer_duration * self._sampling_frequency) self.buffers.clear() self._sample_id_start = None self.buffers = { # (field_id, index): SampleBuffer (1, 0): SampleBuffer(self._length, dtype=np.float32, name='current'), (2, 0): SampleBuffer(self._length, dtype=np.float32, name='voltage'), (3, 0): SampleBuffer(self._length, dtype=np.float32, name='power'), (4, 0): SampleBuffer(self._length, dtype=np.uint8, name='current_range'), # (converted u4->u8 by pyjoulescope_driver binding) (5, 0): SampleBuffer(self._length, dtype='u1', name='gpi0'), (5, 1): SampleBuffer(self._length, dtype='u1', name='gpi1'), } def __len__(self): return self._length def __str__(self): return f'StreamBuffer(length={self._length}, reductions=[])' @property def voltage_range(self): return 0 @property def has_raw(self): return False @property def sample_id_range(self): """Get the range of sample ids currently available in the buffer. :return: Tuple of sample_id start, sample_id end. Start and stop follow normal python indexing: start is inclusive, end is exclusive """ r = [b.range for b in self.buffers.values() if b.active] if len(r) == 0: return None, None start = max([b[0] for b in r]) end = min([b[1] for b in r]) if end < start: end = start return start, end @property def duration_max(self): return self._duration_max @duration_max.setter def duration_max(self, value): self._duration_max = value # stop streaming when reach this sample @property def is_duration_max(self): if self._duration_max is None: return False e1, e2 = self.sample_id_range if e1 is None or self._sample_id_start is None: return False duration = (e2 - self._sample_id_start) / self._sampling_frequency if duration < self._duration_max: return False return True @property def contiguous_duration_max(self): return self._contiguous_duration_max @contiguous_duration_max.setter def contiguous_duration_max(self, value): self._contiguous_duration_max = value @property def is_contiguous_duration_max(self): if self._contiguous_duration_max is None: return False # todo: reset on sample drops e1, e2 = self.sample_id_range if e1 is None or self._sample_id_start is None: return False duration = (e2 - self._sample_id_start) / self._sampling_frequency if duration < self._contiguous_duration_max: return False return True @property def callback(self): return self._callback @callback.setter def callback(self, value): self._callback = value @property def input_sampling_frequency(self): return self._input_sampling_frequency @property def output_sampling_frequency(self): return self._sampling_frequency @output_sampling_frequency.setter def output_sampling_frequency(self, value): self._sampling_frequency = value self._update() @property def buffer_duration(self): return self._buffer_duration @buffer_duration.setter def buffer_duration(self, value): self._buffer_duration = value self._update() @property def limits_time(self): return 0.0, len(self) / self._sampling_frequency @property def limits_samples(self): _, s_max = self.sample_id_range return (s_max - len(self)), s_max def time_to_sample_id(self, t): idx_start, idx_end = self.limits_samples t_start, t_end = self.limits_time return int(round((float(t) - t_start) / (t_end - t_start) * (idx_end - idx_start) + idx_start)) def sample_id_to_time(self, s): idx_start, idx_end = self.limits_samples t_start, t_end = self.limits_time return (int(s) - idx_start) / (idx_end - idx_start) * (t_end - t_start) + t_start def status(self): return { 'device_sample_id': {'value': 0, 'units': 'samples'}, # 'sample_id': {'value': self._sample_id_max, 'units': 'samples'}, 'sample_missing_count': {'value': 0, 'units': 'samples'}, 'skip_count': {'value': 0, 'units': ''}, 'sample_sync_count': {'value': 0, 'units': 'samples'}, 'contiguous_count': {'value': 0, 'units': 'samples'}, } def calibration_set(self, current_offset, current_gain, voltage_offset, voltage_gain): pass def reset(self): for b in self.buffers.values(): b.clear() self._sample_id_start = None def insert(self, topic, value): try: idx = (value['field_id'], value['index']) b = self.buffers[idx] sample_rate = value.get('sample_rate') decimate_factor = value.get('decimate_factor') local_decimate = 1 if decimate_factor == 1 and self._sampling_frequency != sample_rate: local_decimate = sample_rate // self._sampling_frequency b.add(value['sample_id'], value['data'], sample_rate=sample_rate, decimate_factor=decimate_factor, local_decimate_factor=local_decimate) if self._sample_id_start is None: e1, e2 = self.sample_id_range if e1 is not None and e1 != e2: self._sample_id_start = e1 # print(f'{topic} {value["field_id"]}.{value["index"]} {value["sample_id"]} {len(value["data"])} {sample_rate} {decimate_factor} {local_decimate}') except KeyError: pass # buffer does not exist, skip return 0
[docs] def statistics_get(self, start, stop, out=None): """Get exact statistics over the specified range. :param start: The starting sample_id (inclusive). :param stop: The ending sample_id (exclusive). :param out: The optional output np.ndarray(STATS_FIELD_COUNT, dtype=DTYPE). None (default) creates and outputs a new record. :return: The tuple of (np.ndarray(STATS_FIELD_COUNT, dtype=DTYPE), [start, stop]). The values of start and stop will be constrained to the available range. """ self_start, self_stop = self.sample_id_range #self._log.debug(f'statistics_get({start}, {stop}) in ({self_start}, {self_stop})') start = max(start, self_start) stop = min(stop, self_stop) if out is None: out = np.zeros(_STATS_FIELDS, dtype=STATS_DTYPE) if stop >= self_start and start < self_stop: out[:]['length'] = stop - start for i, b in enumerate(self.buffers.values()): if not b.active: out[i]['length'] = 0 out[i]['mean'] = np.nan out[i]['variance'] = np.nan out[i]['min'] = np.nan out[i]['max'] = np.nan continue d = b.get_range(start, stop) compute_stats(d, out[i]) else: out[:]['length'] = 0 out[:]['mean'] = np.nan out[:]['variance'] = np.nan out[:]['min'] = np.nan out[:]['max'] = np.nan return out, (start, stop)
[docs] def data_get(self, start, stop, increment=None, out=None): """Get the samples with statistics. :param start: The starting sample id (inclusive). :param stop: The ending sample id (exclusive). :param increment: The number of raw samples. :param out: The optional output np.ndarray((N, STATS_FIELD_COUNT), dtype=DTYPE) to populate with the result. None (default) will construct and return a new array. :return: The np.ndarray((N, STATS_FIELD_COUNT), dtype=DTYPE) data. This method provides fast access to statistics data, primarily for graphical display. Applications should prefer using :meth:`samples_get` which provides metadata along with the samples. """ self_start, self_stop = self.sample_id_range # self._log.debug(f'data_get({start}, {stop}, {increment}) in ({self_start}, {self_stop})') expected_length = (stop - start) // increment n_total = (stop - start) // increment if out is not None: if len(out) < expected_length: raise ValueError('out too small') else: out = np.zeros((expected_length, _STATS_FIELDS), dtype=STATS_DTYPE) for n in range(n_total): k_start = start + n * increment k_end = k_start + increment if k_start < self_start or k_end > self_stop: out[n, :]['length'] = 0 out[n, :]['mean'] = np.nan out[n, :]['variance'] = np.nan out[n, :]['min'] = np.nan out[n, :]['max'] = np.nan continue for i, b in enumerate(self.buffers.values()): if not b.active: out[n, i]['length'] = increment out[n, i]['mean'] = np.nan out[n, i]['variance'] = np.nan out[n, i]['min'] = np.nan out[n, i]['max'] = np.nan try: d = b.get_range(k_start, k_end) compute_stats(d, out[n, i]) except KeyError: pass return out
[docs] def samples_get(self, start, stop, fields=None): """Get exact sample data without any skips or reductions. :param start: The starting sample id (inclusive). :param stop: The ending sample id (exclusive). :param fields: The single field or list of field names to return. None (default) is equivalent to ['current', 'voltage', 'power', 'current_range', 'current_lsb', 'voltage_lsb']. The available fields are: * raw: The raw u16 data from Joulescope. Equivalent to self.raw_get(start, stop) * raw_current: The raw 14-bit current data in LSBs. * raw_voltage: The raw 14-bit voltage data in LSBs. * current: The calibrated float32 current data array in amperes. * voltage: The calibrated float32 voltage data array in volts. * current_voltage: The calibrated float32 Nx2 array of current, voltage. * power: The calibrated float32 power data array in watts. * bits: The (voltage_lsb << 5) | (current_lsb << 4) | current_range * current_range: The current range. 0 = 10A, 6 = 18 uA, 7=off. * current_lsb: The current LSB, which can be assign to a general purpose input. * voltage_lsb: The voltage LSB, which can be assign to a general purpose input. :return: The dict containing top-level 'time' and 'signals' keys. The 'time' value is a dict contain the timing metadata for these samples. The 'signals' value is a dict with one key for each field in fields. Each field value is also a dict with entries 'value' and 'units'. However, if single field string is provided to fields, then just return that field's value. """ is_single_result = False if fields is None: fields = ['current', 'voltage', 'power', 'current_range', 'current_lsb', 'voltage_lsb'] elif isinstance(fields, str): fields = [fields] is_single_result = True self_start, self_stop = self.sample_id_range #self._log.debug(f'samples_get({start}, {stop}) in ({self_start}, {self_stop})') start = max(start, self_start) stop = min(stop, self_stop) t1 = self.sample_id_to_time(start) t2 = self.sample_id_to_time(stop) result = { 'time': { 'range': {'value': [t1, t2], 'units': 's'}, 'delta': {'value': t2 - t1, 'units': 's'}, 'sample_id_range': {'value': [start, stop], 'units': 'samples'}, 'samples': {'value': stop - start, 'units': 'samples'}, 'input_sampling_frequency': {'value': self.input_sampling_frequency, 'units': 'Hz'}, 'output_sampling_frequency': {'value': self.output_sampling_frequency, 'units': 'Hz'}, 'sampling_frequency': {'value': self.output_sampling_frequency, 'units': 'Hz'}, }, 'signals': {}, } for field in fields: try: idx, units = FIELDS[field] b = self.buffers.get(idx) if b.active: out = b.get_range(start, stop) result['signals'][field] = {'value': out, 'units': units} else: d = np.empty(stop - start, dtype=np.float32) d[:] = np.nan result['signals'][field] = {'value': out, 'units': units} except KeyError: pass # cannot include this signal if is_single_result: return result['signals'][fields[0]]['value'] return result