Full Cycle

In this lecture, we revisit a full cycle of tinyML deployment. For the sake of simplicity, let’s envision that this cycle is motivated by a need for sensors to detect swerving movement of drivers (sudden side-to-side).

Stage 1: Data Generation

We start by developing a firmware sketch for the Nano33BLE device that can detect side-to-side movement. Before we can detect something, we must be able to observe it. Let’s start with observing the Nano33BLE’s IMU sensors.

Accelerometer axes
Initial firmware sketch for data collection
  • Setup a project called Swerve_data_collect inside firmware directory with the following file contents:

  • platformio.ini:

1
2
3
4
5
6
7
[env:nano33ble]
platform = nordicnrf52
board = nano33ble
framework = arduino
lib_deps = 
    arduino-libraries/Arduino_BMI270_BMM150
monitor_speed = 9600
  • main.cpp:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <Arduino.h>
#include <Arduino_BMI270_BMM150.h>

void setup() {
  Serial.begin(9600);
  while (!Serial) {
    ; // wait for serial monitor
  }

  if (!IMU.begin()) {
    Serial.println("Failed to initialize IMU!");
    while (1) {
      delay(1000);
    }
  }

  Serial.println("Serial ready. Initializing IMU...");
  Serial.println("IMU ready.");
  Serial.println("Ax");
}

void loop() {
  float ax, ay, az;

  if (IMU.accelerationAvailable()) {
    IMU.readAcceleration(ax, ay, az);
  }
 
  char line[10];
  snprintf(line,sizeof(line),"%.3f", ax);
  Serial.println(line);
  delay(100);
}
  • Compile and upload.
  • Data collected from an initial run of the above sketch are shown in the following Figure
Single entry data collection from sensors
  • The red rectangles indicate the moments when the chip was moved suddenly side-to-side.
  • Problem: individual data points do not seem to follow a monotonically increasing and decreasing, but there are dips within the side-to-side range. This means that the timing delay (delay(100)) for individual data points is not that great.
  • Reducing delay using the current code will cause more data to be emitted.
    • Perhaps we need to report averaged time?
Firmware sketch for windowed data collection
  • Update main.cpp with the following:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <Arduino.h>
#include <Arduino_BMI270_BMM150.h>

#define WINDOW_SIZE 8 
#define BUFFER_MASK (WINDOW_SIZE - 1)

float buffer[WINDOW_SIZE];
int head = 0;
float running_sum = 0.0;
float average = 0.0;

void setup() {
  Serial.begin(9600);
  while (!Serial) {
    ; // wait for serial monitor
  }

  if (!IMU.begin()) {
    Serial.println("Failed to initialize IMU!");
    while (1) {
      delay(1000);
    }
  }

  Serial.println("Serial ready. Initializing IMU...");
  Serial.println("IMU ready.");
  Serial.println("Ax");
}

void loop() {
  float ax, ay, az;

  if (IMU.accelerationAvailable()) {
    IMU.readAcceleration(ax, ay, az);
  }
 
  running_sum -= buffer[head];
  buffer[head] = ax * ax;
  running_sum += buffer[head];
  head = (head + 1) & BUFFER_MASK;
  average = running_sum / WINDOW_SIZE;

  char line[10];
  snprintf(line,sizeof(line),"%.3f", average);
  Serial.println(line);
  delay(10);
}
  • Compile and upload.
  • In the updated source code, the following changes are made:
    • We implement a circular buffer using a fixed size array (float buffer[WINDOW_SIZE]) of size WINDOW_SIZE.
      • Once the buffer is full, the data will be wrapped around in a circular fashion.
    • WINDOW_SIZE should be a power of 2 to help quickly calculate wrap-around using bitwise operation (head = (head + 1) & BUFFER_MASK;).
    • We don’t care about whether it is negative or positive, but we want to magnify the effect: running_sum += ax * ax;
  • Data points appear in a less fluctuating fashion, but it is still challenging to identify data segments that match up with the side-to-side movement.
Single entry data collection from sensors

Stage 2: Data Engineering

The board is generating data now, and it looks reasonable via the serial monitor. However, for data engineering purpose, we need to have a better way of observing data. The next step is therefore to move data over to a PC via USB serial for visual and analytical purpose.

Firmware sketch for data transfer to PC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <Arduino.h>
#include <Arduino_BMI270_BMM150.h>

#define WINDOW_SIZE 8 
#define BUFFER_MASK (WINDOW_SIZE - 1)

float buffer[WINDOW_SIZE];
int head = 0;
float running_sum = 0.0;
float average = 0.0;

void setup() {
  Serial.begin(9600);
  while (!Serial) {
    ; // wait for serial monitor
  }

  if (!IMU.begin()) {
    Serial.println("Failed to initialize IMU!");
    while (1) {
      delay(1000);
    }
  }

  Serial.println("Serial ready. Initializing IMU...");
  Serial.println("IMU ready.");
  Serial.println("Ax");
}

void loop() {
  float ax, ay, az;

  if (IMU.accelerationAvailable()) {
    IMU.readAcceleration(ax, ay, az);
  }
 
  running_sum -= buffer[head];
  buffer[head] = ax * ax;
  running_sum += buffer[head];
  head = (head + 1) & BUFFER_MASK;
  average = running_sum / WINDOW_SIZE;

  char line[10];
  snprintf(line,sizeof(line),"%.3f|%.3f", ax, average);  Serial.println(line);
  delay(10);
}

Python Notebook

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from serial.tools import list_ports
ports = list(list_ports.comports())

if not ports:
    print("No serial ports found. Check the USB cable, board connection, and drivers.")
else:
    for p in ports:
        print(f"{p.device:20s} | {p.description} | {p.hwid}")

PORT = [p for p in ports if p.description == "Nano 33 BLE"][0].device
BAUD_RATE = 9600
READ_TIMEOUT_SECONDS = 1

print(f"Using port: {PORT}")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import time
from datetime import datetime

SENSOR_DATA = ["accel_x", "avg_accel_x"]
def parse_sensor_line(line: str): 
    line = line.strip()
    numbers = [float(x) for x in line.split("|")]

    data_point = {
        "timestamp": datetime.now().isoformat(timespec="milliseconds"),
        "raw": line,
        "accel_x": 0.0,
        "avg_accel_x": 0.0,
    }

    if len(numbers) >= 2:
        for name, value in zip(SENSOR_DATA, numbers[:2]):
            data_point[name] = value
    return data_point

# Quick parser test
test_line = "0.045|0.001"
parse_sensor_line(test_line)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import pandas as pd
import serial

def collect_sensor_data(port: str=PORT, baud_rate: int=BAUD_RATE, seconds: int=10):
    points = []
    print(f"Opening {port} at {baud_rate} baud...")

    with serial.Serial(port, baud_rate, timeout=READ_TIMEOUT_SECONDS) as ser:
        time.sleep(2)
        ser.reset_input_buffer()

        print("Collecting data. Press the stop button in Jupyter to interrupt.")
        start = time.time()
        while time.time() - start < seconds:
            raw = ser.readline()
            if not raw:
                continue

            line = raw.decode("utf-8", errors="replace").strip()
            if not line:
                continue

            data_point = parse_sensor_line(line)
            data_point["elapsed_seconds"] = time.time() - start
            points.append(data_point)

    df = pd.DataFrame(points)
    print(f"Collected {len(df)} data points.")
    return df

df = collect_sensor_data(seconds=10)
df.head()
1
2
3
4
5
6
7
8
9
10
11
12
13
import matplotlib.pyplot as plt
from IPython.display import clear_output, display

columns=("accel_x", "avg_accel_x")
plt.figure(figsize=(10, 5))
for col in columns:
    plt.plot(df["elapsed_seconds"], df[col], label=col)
plt.xlabel("Elapsed time (seconds)")
plt.ylabel("Sensor value")
plt.title("Live Nano 33 BLE Sensor Stream")
plt.legend(loc="upper right")
plt.grid(True)
plt.show()
Visualizing collected sensor data
1
2
3
4
5
6
7
8
9
10
11
import numpy as np

# 1. Define the list of conditional checks
conditions = [df["avg_accel_x"] >= 0.1, df["avg_accel_x"] < 0.1]

# 2. Define the corresponding labels for those conditions
labels = ["Y", "N"]

# 3. Apply numpy select
df["Label"] = np.select(conditions, labels, default="Unknown")
df.head(50)
1
2
3
4
5
6
7
8
9
10
from pathlib import Path

DATA_DIR = Path("data")
DATA_DIR.mkdir(parents=True, exist_ok=True)
output_file = DATA_DIR / "nano33ble_sensor_capture.csv"

df_cleaned = df.iloc[10:-10]
df_cleaned.to_csv(output_file, index=False)

print(f"Saved {len(df_cleaned)} rows to {output_file}")

Stage 3: Model Development

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import pandas as pd
from sklearn.model_selection import train_test_split
from pathlib import Path

DATA_DIR = Path("data")
input_file = DATA_DIR / "nano33ble_sensor_capture.csv"

FEATURE_COL = "avg_accel_x"
LABEL_COL = "Label"

df = pd.read_csv(input_file)
df[FEATURE_COL] = pd.to_numeric(df[FEATURE_COL], errors="coerce")

X_data = df[[FEATURE_COL]].to_numpy()  # Shaped as (N, 1) for the network
y_data = df[LABEL_COL].map({"Y": 1, "N": 0}).to_numpy() # Shape (N,) for binary classification and ensure it's numeric

X_train, X_test, y_train, y_test = train_test_split(X_data, y_data, test_size=0.20, random_state=42)

# Calculate normalization constants from training data
X_mean = X_train.mean()
X_std = X_train.std()

# Manually scale the training and testing sets
X_train_scaled = (X_train - X_mean) / X_std
X_test_scaled = (X_test - X_mean) / X_std

print(f"Scaling Parameters: Mean: {X_mean}, Std: {X_std}")
print(f"Training samples: {len(X_train)} | Testing samples: {len(X_test)}")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import tensorflow as tf

model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=(1,)),  # Explicitly declare input shape for TFLite
        tf.keras.layers.Dense(8, activation="relu"),  
        tf.keras.layers.Dense(1, activation="sigmoid"),  # Sigmoid activation since this is a binary classification problem
    ]
)

model.compile(
    optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"]
)

model.fit(X_train_scaled, y_train, 
    epochs=20, batch_size=16, 
    validation_data=(X_test_scaled, y_test)
)

Visualizing collected sensor data
1
2
3
4
5
6
7
8
9
10
11
12
import pathlib

model_dir = pathlib.Path("./models")
model_dir.mkdir(parents=True, exist_ok=True)

converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]    

tflite_model = converter.convert()

tflite_model_file = model_dir/'model.tflite'     
tflite_model_file.write_bytes(tflite_model)

Stage 4: Live Validation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import tensorflow as tf
import numpy as np
import matplotlib.pylab as plt
from ai_edge_litert.interpreter import Interpreter
from pathlib import Path


model_dir = Path("./models")
tflite_model_file = model_dir/'model.tflite'   

interpreter = Interpreter(model_path=str(tflite_model_file))
interpreter.allocate_tensors()

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

input_index = input_details[0]["index"]
output_index = output_details[0]["index"]

input_shape = input_details[0]["shape"]
input_dtype = input_details[0]["dtype"]

print("Input shape expected by TFLite:", input_shape)
print("Input dtype expected by TFLite:", input_dtype)
print("Output shape:", output_details[0]["shape"])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import serial
from serial.tools import list_ports
import time
from datetime import datetime
import pandas as pd

ports = list(list_ports.comports())

if not ports:
    print("No serial ports found. Check the USB cable, board connection, and drivers.")
else:
    for p in ports:
        print(f"{p.device:20s} | {p.description} | {p.hwid}")

PORT = [p for p in ports if p.description == "Nano 33 BLE"][0].device
BAUD_RATE = 9600
READ_TIME = 10
READ_TIMEOUT_SECONDS = 1

print(f"Using port: {PORT}")

points = []
print(f"Opening {PORT} at {BAUD_RATE} baud for {READ_TIME} seconds...")

with serial.Serial(PORT, BAUD_RATE, timeout=READ_TIMEOUT_SECONDS) as ser:
    time.sleep(2)
    ser.reset_input_buffer()

    print("Collecting data. Press the stop button in Jupyter to interrupt.")
    start = time.time()

    while time.time() - start < READ_TIME:
        raw = ser.readline()
        if not raw:
            continue

        line = raw.decode("utf-8", errors="replace").strip()
        if not line:
            continue
        numbers = [float(x) for x in line.split("|")]
        if len(numbers) < 2:
            print(f"Unexpected data format: '{line}'")
            continue
        input_np = np.array(numbers[1], dtype=input_dtype).reshape(input_shape)
        interpreter.set_tensor(input_index, input_np)
        interpreter.invoke()
        pred = 1 if interpreter.get_tensor(output_index)[0][0] > 0.5 else 0
        
        data_point = {"timestamp": datetime.now().isoformat(timespec="milliseconds"),
                      "raw": line,
                      "accel_x": numbers[0],   
                      "avg_accel_x": numbers[1],
                      "elapsed_seconds": time.time() - start,
                      "prediction": pred
        }
        points.append(data_point)
    df = pd.DataFrame(points)
    print(f"Collected {len(df)} data points.")

df.head(100)
1
2
3
4
5
6
7
8
9
10
11
12
13
import matplotlib.pyplot as plt
from IPython.display import clear_output, display

columns=("accel_x", "avg_accel_x", "prediction")
plt.figure(figsize=(10, 5))
for col in columns:
    plt.plot(df["elapsed_seconds"], df[col], label=col)
plt.xlabel("Elapsed time (seconds)")
plt.ylabel("Sensor value")
plt.title("Live Nano 33 BLE Sensor Stream")
plt.legend(loc="upper right")
plt.grid(True)
plt.show()
Visualizing sensor data and predictions

Stage 5: Deployment

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from pathlib import Path

model_dir = Path("./models")
tflite_path = model_dir/'model.tflite'   
cc_path = model_dir/'model.cc'

data = pathlib.Path(tflite_path).read_bytes()

lines = []

for i in range(0, len(data), 12):
    chunk = data[i:i + 12]
    hex_values = ", ".join(f"0x{byte:02x}" for byte in chunk)
    lines.append(f"  {hex_values},\n")

pathlib.Path(cc_path).write_text("".join(lines), encoding="utf-8")
print(f"Wrote C array to {cc_path} with {len(data)} bytes of data.")

print("Wrote:", tflite_path, "to", cc_path)
print("C array size:", pathlib.Path(cc_path).stat().st_size, "bytes")
1
2
3
# Print the end of the generated C source file.
lines = Path(cc_path).read_text(encoding="utf-8").splitlines()
print("\n".join(lines))
1
2
3
4
5
6
7
8
9
10
[env:nano33ble]
platform = nordicnrf52
board = nano33ble
framework = arduino
monitor_speed = 9600

lib_deps =
    arduino-libraries/ArduinoBLE
    arduino-libraries/Arduino_BMI270_BMM150
    tinymlx/Harvard_TinyMLx
1
2
3
4
unsigned char model_data[] = {
  BYTE ARRAY GOES HERE
};
unsigned int model_data_len = 11422;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#include <Arduino.h>
#include <Arduino_BMI270_BMM150.h>

#include <TensorFlowLite.h>

#include "tensorflow/lite/micro/micro_error_reporter.h"
#include "tensorflow/lite/micro/micro_interpreter.h"
#include "tensorflow/lite/micro/micro_mutable_op_resolver.h"
#include "tensorflow/lite/schema/schema_generated.h"
#include "tensorflow/lite/version.h"

#define WINDOW_SIZE 8 
#define BUFFER_MASK (WINDOW_SIZE - 1)

float buffer[WINDOW_SIZE];
int head = 0;
float running_sum = 0.0;
float average = 0.0;
float scaled_average = 0.0;

/* From model_development.ipynb
  Scaling Parameters: Mean: 0.0636779359430605, Std: 0.126740584005968
*/

#define training_mean 0.0636779359430605
#define training_std 0.126740584005968
  
constexpr int kTensorArenaSize = 30 * 1024;
uint8_t tensor_arena[kTensorArenaSize];
  
tflite::ErrorReporter* error_reporter = nullptr;
const tflite::Model* model = nullptr;
tflite::MicroInterpreter* interpreter = nullptr;
  
constexpr int label_count = 2;
const char* labels[label_count] = {"Y", "N"};

extern const unsigned char model_data[];
extern const int model_data_len;

void setup() {
  // Start serial
  Serial.begin(9600);
  while (!Serial);

  Serial.println("Started");

  // Start IMU
  if (!IMU.begin()) {
    Serial.println("Failed to initialized IMU!");
    while (1);
  }

  static tflite::MicroErrorReporter micro_error_reporter;
  error_reporter = &micro_error_reporter;

  // Map the model into a usable data structure. This doesn't involve any
  // copying or parsing, it's a very lightweight operation.
  model = tflite::GetModel(model_data);
  if (model->version() != TFLITE_SCHEMA_VERSION) {
    TF_LITE_REPORT_ERROR(error_reporter,
                         "Model provided is schema version %d not equal "
                         "to supported version %d.",
                         model->version(), TFLITE_SCHEMA_VERSION);
    return;
  }

  static tflite::MicroMutableOpResolver<2> micro_op_resolver;  // NOLINT
  micro_op_resolver.AddFullyConnected(); // Dense Layer
  micro_op_resolver.AddLogistic(); // Sigmoid is considered a logistic function

  // Build an interpreter to run the model with.
  static tflite::MicroInterpreter static_interpreter(
      model, micro_op_resolver, tensor_arena, kTensorArenaSize, error_reporter);
  interpreter = &static_interpreter;

  // Allocate memory from the tensor_arena for the model's tensors.
  interpreter->AllocateTensors();

  // Get model input tensor
  TfLiteTensor* model_input = interpreter->input(0);

  /* From model_validation.ipynb: 
    Input shape expected by TFLite: [1 1]
    Input dtype expected by TFLite: <class 'numpy.float32'>
    Output shape: [1 1]
  */
  if ((model_input->dims->size != 2) ||
      (model_input->dims->data[0] != 1) ||
      (model_input->dims->data[1] != 1) ||
      (model_input->type != kTfLiteFloat32)) {
    TF_LITE_REPORT_ERROR(error_reporter,"Bad input tensor parameters in model");
    return;
  }
  TfLiteTensor* model_output = interpreter->output(0);
  if ((model_output->dims->size != 2) ||
      (model_output->dims->data[0] != 1) ||
      (model_output->dims->data[1] != 1) ||
      (model_output->type != kTfLiteFloat32)) {
    TF_LITE_REPORT_ERROR(error_reporter, "Bad output tensor parameters in model");
    return;
  }
}

void loop() {
  float ax, ay, az;

  if (IMU.accelerationAvailable()) {
    IMU.readAcceleration(ax, ay, az);
  }
 
  running_sum -= buffer[head];
  buffer[head] = ax * ax;
  running_sum += buffer[head];
  head = (head + 1) & BUFFER_MASK;
  average = running_sum / WINDOW_SIZE;
  scaled_average = (average - training_mean) / training_std;

  // Pass to the model and run the interpreter
  TfLiteTensor* model_input = interpreter->input(0);
  model_input->data.f[0] = scaled_average;  
  
  TfLiteStatus invoke_status = interpreter->Invoke();
  if (invoke_status != kTfLiteOk) {
    TF_LITE_REPORT_ERROR(error_reporter, "Invoke failed");
    return;
  }
  TfLiteTensor* output = interpreter->output(0);

  // Parse and interpret the model output

  float probability = output->data.f[0];
  const int predicted_class = (probability >= 0.5f) ? 1 : 0;

  char line[30];
  snprintf(line,sizeof(line),"%.3f|%.3f|%s", ax, average, predicted_class == 1 ? "Y" : "N");
  Serial.println(line);
  delay(10);
}
On-board prediction