1. THE KERNEL AT A GLANCE

1.1. Design Approach

RK0 was designed from scratch to make worst-case execution time (WCET) as straightforward to reason as possible. Execution progress of the program happens for well-defined reasons.

1.1.1. Architecture

The layered architecture can be split — roughly — into two: a top and a bottom layer. On the top, the Executive manages the resources needed by the application.

On the bottom, the Low-level Scheduler works as a software extension of the CPU.

Together, they implement the Task abstraction. This primitive is the Concurrency Unit and follows the Thread model. A Task is a thread.

layeredkernel

In systems design jargon, the Executive enforces policy (what should happen). The Low-level Scheduler provides the mechanism (how it gets done). The services are the primitives that gradually translate policy decisions into concrete actions executed by the Scheduler.

RK0’s goal is determinism on low-end devices. Its multitasking engine does not split user space from kernel space. Tasks execute in privileged mode and use a dedicated process stack pointer, distinct from the system stack. The rationale:

  • Application tasks are not unknown entities at run time.

  • Implementing system calls as traps either forces a non-preemptive kernel, or increases complexity in critical paths, degrading determinism.

  • Relying on the ARMv6/7-M MPU decreases memory usage efficiency and introduces latency on control paths. It does not fit RK0’s deterministic execution model.

1.1.2. Programming with RK0

The current API is found here:
RK0 API.

Every service in RK0 solves a different problem. While some overlap is unavoidable, they are kept as orthogonal as possible.

The 'meaning' of each service is described here:
Service Semantics

1.1.3. Suitable Applications

Given the architecture, RK0 targets applications with the following characteristics:

  1. They are designed to handle particular devices in which real-time responsiveness is imperative.

  2. Applications and middleware may be implemented alongside appropriate drivers.

  3. Drivers may even include the application itself.

  4. Untested programs are not loaded: After the software has been tested, it can be assumed reliable.

1.2. Kernel Services

RK0 has Core Services (always enabled) and optional services (enabled by configuration).

Core Services:

  • Scheduler

  • Partition Memory Allocator

  • Sleep Timers (with 3 different policies)

  • Task’s Event Register

  • Task’s Mail

Optional Services:

  • Application Timer (Callouts)

  • Sleep Queues (for Monitor-like constructs)

  • Counting/Binary Semaphores

  • Mutex Locks (with transitive priority inheritance)

  • Message Queues

  • Channels (for client-server procedure calls)

  • Most-Recent Message Protocol (MRM) (1:N lock-free, last message)

When compiled with only Core Services, one gets a functional Executive with less than ~3 KB ROM.

2. Scheduler

Aside from design details that will be discussed later, RK0 scheduler is a priority-based, preemptive scheduler, supporting 32 different priorities. Quite standard. The major difference from others kernels is that it deliberately has no built-in time-slice.

2.1. Scheduler policy

A task assumes 3 states: RUNNING, READY or WAITING.

Knowing the that the READY tasks are within a table of FIFO Queues, and each row is related to a priority:

The highest priority ready task is at the head of highest priority non-empty ready queue.

We can outline the scheduler policy:

  • A task must switch to the READY state before being eligible for scheduling.

  • After a task is dispatched it will keep RUNNING until preempted by a higher priority task, blocked or yielding.

  • A yield will only switch tasks if there is another task with equal or higher priority that is ready.

  • A task will switch from RUNNING to READY if yielding or if being preempted by a higher priority task. Otherwise it can only go to a WAITING state, and eventually switch back to READY.

  • When a task is preempted by a higher priority task, it switches from RUNNING to READY and is placed back on the head position of its Ready Queue. This means that it will be resumed as soon as it is the highest priority ready task again.

  • On the contrary, if a task yields, it tells the scheduler it has completed its cycle. Then, it will be enqueued on the ready queue tail - the last queue position.

  • When a task waits it is suspended until a condition is satisfied.

  • When the condition is satisfied, it switches from WAITING to READY, and is enqueued on the tail.

  • So, tasks with the same priority cooperate by either yielding or waiting.

  • If a task never yields or waits, other tasks with the same or lower priority will starve.

  • Finally, Tasks with the same priority are initially placed on the Ready Queue associated with that priority in the order they are created.

RK0 can handle context-switching with an extended frame when a float-point co-processor is available. This must be informed when compiling by defining the symbol __FPU_PRESENT=1.

2.1.1. Scheduler Design Internals

A notable scheduler characteristic is constant-time complexity (O(1)) with low latency. This was achieved by carefully composing the data structures and an efficient 'choose-next' algorithm, detailed below.

2.1.1.1. Task Control Block

Every primitive is associated to a data structure we refer to as its Control Block. A Task Control Block is a record for stack, resources, and time management. The table below partially represents a Task Control Block (as this document is live, this might not reflect the exact fields of the current version).

Task Control Block

Task name

Task ID

Status

Assigned Priority

Effective Priority

Saved Stack Pointer

Stack Address

Stack Size

Last wake-time

Next wake-time

Time-out Flag

Preemption Flag

Owned Resources List

Waiting Resources List

Event Register Control Block

Mail Slot

Timeout List Node

TCB List Node

Tasks are static — they are not created (or destroyed) on runtime. There is no fork or join.

schdatastruct

The scheduler rules, not the heap.

RK0 tasks are static.

It’s a design decision rooted in real-time correctness.

Besides an application-specific system software does not need to treat tasks as 'unknown' objects.

The wins:

  • A memory layout the systems programmer knows.

  • No alignment traps.

  • Link-time visibility:

    • Each task’s stack is a named symbol in the linker map.

    • You can inspect and verify the memory layout before flashing.

    • A simple objdump reveals all stack allocations — that’s peace of mind.

2.1.1.2. Task Queues

The backbone of the queues where tasks will wait for their turn to run is a circular doubly linked list: removing any item from a double list takes O(1) (provided we don’t need to search the item). As the kernel knows each task’s address, adding and removing is always O(1). Singly linked lists can’t achieve O(1) for removal.

2.1.1.3. Ready Queue Table

Another design choice to achieve O(1) is the global ready queue, which is a table of FIFO queues—each queue dedicated to a priority—and not a single ordered queue. So, enqueuing a ready task is always O(1). Given the sorting needed, the time complexity would be O(n) if tasks were placed on a single ready queue.

2.1.1.4. Waiting Queues

The scheduler does not have a unique waiting queue. Every kernel object that can block a task has an associated waiting queue. Because these queues are a scheduler component, they follow a priority discipline: the highest priority task is dequeued first, always.

When an event capable of switching tasks from WAITING to READY happens, one or more tasks (depending on the mechanism) are then placed on the ready list, unique to their priority. Now, they are waiting to be picked by the scheduler—that is the definition of READY.

2.1.1.5. The choose-next algorithm

As the ready queue table is indexed by priority - the index 0 points to the queue of ready tasks with priority 0, and so forth, and there are 32 possible priorities - a 32-bit integer can represent the state of the ready queue table. It is a BITMAP:

The BITMAP update happens whenever:

(1a) A task is readied, update: BITMAP |= (1U << task->priority);
(1b) An empty READY QUEUE becomes non-empty, update: BITMAP |= (1U << queueIndex)
(2): Every Time READY QUEUE becomes empty, update: BITMAP &= ~(1U << queueIndex);
EXAMPLE:

  Ready Queue Index :     (6)5 4 3 2 1 0
          Not empty :      1 1 1 0 0 1 0
                           ------------->
                 (LOW)  Effective Priority  (HIGH)
In this case, the scenario is a system with 7 priority task levels. Queues with priorities 6, 5, 4, and 1 are not empty.

Having the Ready Queue Table bitmap, we find the highest priority non-empty task list as follows:

(1) Isolate the rightmost '1':

RBITMAP = BITMAP & -BITMAP. (- is the bitwise operator for two's complement: ~BITMAP + 1) `

In this case:

                           [31]       [0]  :  Bit Position
                             0...1110010   :  BITMAP
                             1...0001110   : -BITMAP
                            =============
                             0...0000010   :  RBITMAP
                                     [1]

The rationale here is that, for a number N, its 2’s complement -N, flips all bits - except the rightmost '1' (by adding '1') . Then, N & -N results in a word with all 0-bits except for the less significant '1'.

(2) Extract the rightmost '1' position:

  • For ARMv7M, we benefit from the CLZ instruction to count the leading zeroes. As they are the number of zeroes on the left of the rightmost bit, '1', this value is subtracted from 31 to find the Ready Queue index.

unsigned __getReadyPrio(unsigned readyQBitmap)
{
    unsigned ret;
    __ASM volatile (
        "clz    %0, %1     \n"
        "neg    %0, %0     \n"
        "add    %0, %0, #31\n"
        : "=&r" (ret)
        : "r" (readyQBitmap)
        :
    );
    return (ret);
}

This instruction would return #30, and #31 - #30 = #01 in the example above.

  • For ARMv6M there is no suitable hardware instruction. The algorithm is written in C and counts the trailing zeroes, thus, the index number. Although it might vary depending on your compiler settings, it takes ~11 cycles (note it is still O(1)):

/*
  De Brujin's multiply+LUT
  (Hacker's Delight book)
*/

/* table is on a ram section  for efficiency */
 const static unsigned readyPrioTbl[32] =
{
 0, 1, 28, 2, 29, 14, 24, 3, 30, 22, 20, 15, 25, 17, 4, 8,
 31, 27, 13, 23, 21, 19, 16, 7, 26, 12, 18, 6, 11, 5, 10, 9
};

RK_FORCE_INLINE static inline
unsigned __getReadyPrio(unsigned readyQBitmap)
{
    unsigned mult = readyQBitmap * 0x077CB531U;

    /* Shift right the top 5 bits
     */
    unsigned idx = (mult >> 27);

    /* LUT */
    unsigned ret = (unsigned)readyPrioTbl[idx];
    return (ret);
}

For the example above, mult = 0x2 * 0x077CB531 = 0x0EF96A62. The 5 leftmost bits (the index) are 00001table[1] = 1.

During a context switch, the procedures to find the highest priority non-empty ready queue table index are as follows:

static inline RK_PRIO kCalcNextTaskPrio_(VOID)
{
    if (readyQBitMask == 0U)
    {
        return (idleTaskPrio);
    }
    readyQRightMask = readyQBitMask & -readyQBitMask;
    RK_PRIO prioVal = (RK_PRIO) (__getReadyPrio(readyQRightMask));
    return (prioVal);
}

VOID kSwtch(VOID)
{
    /* O(1) complexity */
	nextTaskPrio = kCalcNextTaskPrio_();

	RK_TCB* nextRunPtr = NULL;

	/* O(1) complexity */
	kTCBQDeq(&readyQueue[nextTaskPrio], &nextRunPtr);

	runPtr = nextRunPtr;

}

3. Timers and Delays

3.1. Busy delay

A busy-wait delay kBusyDelay(t) keeps a task spinning for t ticks. That is, the task does nothing but does not suspend or yield (but can be preempted). This service finds its use when simulating workloads.

Context switching is probably the most significant overhead on a kernel. The time spent on the System Tick handler contributes to much of this overhead.

Design Choice:

  • Timers are kept on a single list; only the head element needs to be updated using a delta-queue approach.

  • Application Timers that trigger callbacks are run on a deferred, non-preemptible system task.

Benefits:

  • Keep the overhead of updating timers as minimal as possible with the delta queue;

  • Deferring the Application Timer to a high-priority, non-preemptible system task meet the requested callback period while keeping the ability to track system ticks.

Timeout Node

Timeout Type

Absolute Interval (Ticks)

Relative Interval (Ticks)

Waiting Queue Address

Next Timeout Node

Previous Timeout Node

Every task is prone to events triggered by timers described in this section. Every Task Control Block has a node to a timeout list. This list is doubly linked treated a delta-sequence.

A set Tset = {(T1,8), (T2,6), (T3,10)} will be started at a relative time 0 as a sequence Tseq = <(T2,6), (T1,2), (T3,2)>.

Thus, for every system tick, only the head element on the list needs to be decreased — yielding O(1) on decreasing, that happens on the Hardware interrupt for the System Tick.

The ordering for the delta-queue is not O(1), it is O(n). A decrease happens on every SysTick interrupt; and the ordering happens only when adding a new node to the list.

3.2. Sleep Timers

There are three sleep primitives in RK0 — they behave differently.

3.2.1. Sleep Delay

The sleepdelay() (aliased as sleep()) puts a task to sleep for the exact number of t ticks on every call — no matter when the last call has happened.

Example:

VOID Task1(VOID* args)
{
    RK_UNUSEARGS
    UINT count = 0;
    while (1)
    {

        logPost("Task1: sleep");
        kSleep(300);
        /* wake here */
        count += 1U;
        if (count >= 5)
        {
            kDelay(25); /* spin */
            count=0;
            /* every 5 activations there will be a drift */
        }
    }
}

Output:

0 ms :: Task1: sleep
300 ms :: Task1: sleep  <-- +300
600 ms :: Task1: sleep  <-- +300
900 ms :: Task1: sleep  <-- +300
1200 ms :: Task1: sleep <-- +300
1525 ms :: Task1: sleep <-- +325
1825 ms :: Task1: sleep <-- +300
2125 ms :: Task1: sleep <-- +300
2425 ms :: Task1: sleep
2725 ms :: Task1: sleep
3050 ms :: Task1: sleep
3350 ms :: Task1: sleep
3650 ms :: Task1: sleep
3950 ms :: Task1: sleep
4250 ms :: Task1: sleep
4575 ms :: Task1: sleep

3.2.2. Compensated Sleep Delays

These are suspensions that recompute the time considering the drift between calls. They are typically used to create periodic tasks with explicit periods. The general pattern is:

VOID Task(VOID *args)
{
    <initialisation>

    while(1)
    {
        <periodic code>; /* this has an execution time */
        scheduleNext(PERIOD); /* cycle is finished: compute delay to keep PERIOD */
    }
}

The RMS algorithm considers all tasks have a common phase grid, that is when they are made READY, or are eligible to be scheduled. This means that from the very first activation, the time elapsed is already take into account to when the next release should happen.

Nevertheless, it is common for kernels to provide sleep primitives that take into account a local anchor time normally set on the <initialisation> code — and will work as waitUntil(&anchor, PERIOD). RK0 provides both, and they suit different cases, as exposed below.

3.2.2.1. Periodic Release Sleep

sleeprelease(P) is used to delay a task so it is released on periodic rate. The sleep time is recalculated by the kernel on every call.

If the task wakes late by N ticks with 0 < N < P, the kernel compensates by scheduling the next wake earlier (shortening the next sleep) so that over two periods the phase is preserved:

Say a task is expected to return from its keth sleep at Tk+1 = Tk + P [ticks]. If the task is resumed at Tk+1 = Tk + P + N, upon detecting this drift, the kernel sets: (Tk+2 = Tk+1 + P - N) for N < P.

This can be rewritten as:

(Tk+2 = Tk + P + N + P - N) ←→ (Tk+2 - Tk = 2P)

Example:

VOID Task1(VOID* args)
{
    RK_UNUSEARGS
    UINT count = 0;
    while (1)
    {

       logPost("Task1 released.");

        count += 1U;
        if (count >= 5)
        {
            kDelay(25); /* spin */
            count=0;
        }

        kSleepRelease(300); /*P=300 ticks; tick=1ms*/


    }
}

Output:

.
.

/* R is release time */

1200 ms :: Task1: sleep periodic (R==4P)        (n)
1525 ms :: Task1: sleep periodic (5P<R<6P)       |
1800 ms :: Task1: sleep periodic (6P)            |
2100 ms :: Task1: sleep periodic (7P)            |
2400 ms :: Task1: sleep periodic (8P)            |
2700 ms :: Task1: sleep periodic (9P)            |
3025 ms :: Task1: sleep periodic (10P<R<11P)     |
3300 ms :: Task1: sleep periodic (11P)           |
3600 ms :: Task1: sleep periodic (12P)           |
3900 ms :: Task1: sleep periodic (13P)           |
4200 ms :: Task1: sleep periodic (14P)           |
4525 ms :: Task1: sleep periodic (15P<R<16P)     |
4800 ms :: Task1: sleep periodic (16P)          (m)  m-n=12
.                                              -----
.                                           Phase=3600=12xP
.

This mechanism is phase-locked. When the lateness is greater or equal to P it skips one or more releases to stay locked to the phase grid. In some sense, the period value can be seen as a deadline — if not met, the scheduler rejects to run on that activation.

sleeprelease() makes easier to perform worst-response time analysis on periodic tasks.

A set of periodic tasks must have priorities assigned properly (highest request rate, highest priority — the lower the period, the higher the priority). For sleeprelease() this is mandatory given the common phase grid.
3.2.2.2. Sleep Until (local anchor base)

sleepuntil(anchor, period) is somehow similar to sleeprelease(), but differs in two important aspects:

  • The reference used to calculate how long to suspend to keep its rate is local to each task. It means the time before the first time the task is dispatched is dismissed.

  • A late release longer than 1 period will return and run immediately. It prioritises execution count within a time-window - not the phase across releases.


The snippet belows clearly demonstrates how each mechanism handles lateness that are longer than 1 period:

/* Every 3rd call both tasks will add up a delay longer than the task's period */

VOID HTask(VOID* args) /* higher priority: Period is 300 ticks */
{
    RK_UNUSEARGS
    UINT count = 0;
    while (1)
    {

        logPost("Higher: begin\r\n");
        /* wake here */
        count += 1U;
        kDelay(5);
        if (count >= 3)
        {
            kSleep(400); /* suspend */
            count=0;
        }
        logPost("Higher: end\r\n");
        kSleepRelease(300);

    }
}


VOID LTask(VOID *args) /* lower priority: Period is 400 ticks */
{
    RK_UNUSEARGS
    RK_TICK anchor = kTickGet();
    UINT count=0;
    while (1)
    {

        logPost("Lower: begin\r\n");
        /* wake here */
        count += 1U;
        kDelay(5);
        if (count >= 3)
        {
            kSleep(500);
            count=0;
        }
        logPost("Lower: end\r\n");
        kSleepUntil(&anchor, 400);
    }
}

Output:

       0 ms :: Higher: begin
       5 ms :: Higher: end
       5 ms :: Lower: begin
      10 ms :: Lower: end
     300 ms :: Higher: begin
     305 ms :: Higher: end
     405 ms :: Lower: begin
     410 ms :: Lower: end

    /* H 3rd run, expected next at 900ms */
     600 ms :: Higher: begin


   /* L 3rd run, expected next at 1205 ms */
     805 ms :: Lower: begin

    /* H Drift: 1005ms - 600ms = 405 ms > 300ms */
    1005 ms :: Higher end

    /* H released again @ next multiple of 300. */
    1200 ms :: Higher: begin
    1205 ms :: Higher: end

   /* L Drift: 1310ms - 805ms = 505 ms > 400ms */
    1310 ms :: Lower: end
    /* it runs again  immediately */
    1310 ms :: Lower: begin

One normally does not write a code with periodic tasks expecting they will not keep their rate. But on the field a transient overload might cause it to happen. If it does, you choose the policy that is best-fit for your task: preserve phase (skip) or preserve execution count.

3.3. Blocking Time-out

These are internal timers associated with kernel calls that are blocking. Thus, establishing an upper-bound waiting time might benefit them. When the time for unblocking is up, the kernel call returns, indicating a timeout. This value is passed as a number of ticks.

When blocking is associated with a kernel object (other than the Task Control Block), the timeout node will store the object waiting for queue’s address, so it can be removed if time expires.

A kernel call is made non-blocking, that is try semantics, by assigning the value RK_NO_WAIT, the function returns immediately if unsuccessful. The value RK_WAIT_FOREVER suspends a task indefinitely until the condition is satisfied. Timeout values above RK_MAX_PERIOD are invalid.

In practice, we often block either using RK_WAIT_FOREVER or do not block (try semantics, RK_NO_WAIT).

Use a bounded timeout only when you expect occasional misses and you know how to handle them.

Importantly, an ISR shall never block. Any blocking call from an ISR is invalid and triggers fault handling when error checking is enabled.

3.4. Callout Timers (Application Timers)

Timer Control Block

Option: Reload/One-Shot

Phase (Initial Delay)

Callout Function Pointer

Callout Argument

Timeout Node

These are Application Timers that will issue a callback when expiring. In addition to a callout function, an Application Timer receives an initial phase delay and a period and can choose to run once (one-shot) or auto-reload itself.

The callback runs within a System Task with priority 0 and is non-preemptible, which makes the scheduler prioritise it over other tasks. Callouts must be short and unblocking, as they can cause high CPU contention.

Application Timers (with autoreload) will keep track of delays in between activations.

3.5. System Tick

A dedicated peripheral that generates an interrupt after a defined period provides the kernel time reference. For ARMv6/7M, this peripheral is the built-in SysTick, a 24-bit counter timer.

The 'housekeeping' accounts for global timer tracking and any tick-dependent condition that might change a task status. The handler performs some housekeeping on every tick. If a task whose execution progress was depending on time switches to READY the routine returns to call the scheduler. If an application timer is due, it signals the system task that performs the installed callback and additional logic.

The recommended tick interval for RK0 is no less than 10ms.

4. System Tasks

System Tasks perform housekeeping and other kernel maintenance outside interrupt handlers.

Currently there are the Idle Task the PostProcSysTask.

The PostProcSysTask executes Application Timer callbacks and work deferred from ISRs, such as broadcast-style flushes on Sleep Queues or Semaphores

The Idle Task runs whenever there is no other ready task to be dispatched. The CPU enters in low-power mode. If RK_CONF_MIN_PRIO is set to 31—and it can be even if not all priorities are occupied (with a small memory overhead)—the IdleTask is dispatched when the Ready bitmap is 0x00000000.

5. Memory Allocator

Memory Allocator Control Block

Associated Block Pool

Number of Blocks

Block Size

Number of Free Blocks

Free Block List

The standard C library malloc() leads to fragmentation and (also, because of that) is highly indeterministic. Unless we use it once - to allocate memory before starting up, it doesn’t fit. But often, we need to 'multiplex' memory amongst tasks over time, that is, to dynamically allocate and deallocate.

To avoid fragmentation, we use fixed-size memory blocks. A simple approach would be a static table marking each block as free or taken. With this pattern, you will need to 'search' for the next available block, if any - the time for searching changes - bounding this search to a maximum number of blocks, or O(n). To optimise, an approach is to keep track of what is free using a dynamic table—a linked list of addresses. Now we have O(1).

We use "meta-data" to initialise the linked list. Every address holds the "next" address value. All addresses are within the range of a pool of fixed-size blocks. This approach limits the minimal size of a block to the size of a memory address—32 bits for our supported architecture.

Yet, this is the cheapest way to store meta-data. If not stored on the empty address itself, an extra 32-bit variable would be needed for each block, so it could have a size of less than 32 bits.

Allocating memory at runtime is a major source of latency (1), indeterministic (2) behaviour, and footprint overhead (3).

Design choice: the allocator’s design achieves low-cost, deterministic, fragmentation-free memory management by using fixed-size word-aligned block sizes (1)(2) and embedding metadata within the memory blocks themselves (3).

Benefits: Run-time memory allocation benefits have no real-time drawbacks.

The kernel will always round up the block size to the next multiple of 4. Say the user creates a memory pool, assigning blocks to be 6-byte wide; they will turn into 8-byte blocks.

5.1. How it works

When a routine calls alloc(), the address to be returned is the one a "free list" is pointing to, say addr1. Before returning addr1 to the caller, we update the free list to point to the value stored within addr1 - say addr8 at that moment.

When a routine calls free(addr1), we overwrite whatever has been written in addr1 with the value-free list point to (if no more alloc() were issued, it would still be addr8), and addr1 becomes the free list head again.

Allocating and deallocating fixed-size blocks using this structure and storing meta-data this way is as deterministic (O(1)) and economical as we can get for dynamic memory allocation.


6. Inter-Task Communication: Signals and Messages

Inter-Task Communication (ITC) refers to the mechanisms that enable tasks to coordinate/cooperate/synchronise by means of sending or receiving information that falls into two logical categories: Signals or Messages.

  • Signals: A Signal is either present or absent, either accumulate or not and are represented as tokens. The meaning is implicit.

  • Messages: A Message is a means of coordinating and exchanging information altogether. Different from a Signal, each message may convey a different information.

6.1. Semaphores

Semaphore Control Block

Counter (Unsigned Integer)

Maximum Value

Waiting Queue

Semaphores are public kernel objects for signalling and waiting on countable events. Differently from task events, any task can wait or signal a semaphore.

A semaphore S is a nonnegative integer variable, apart from the operations it is subjected to. S is initialised to a nonnegative value. The two operations, called P and V, are defined as follows:

P(S): if S > 0 then S := S-1, else the process is suspended until S > 0.

V(S): if there are processes waiting, then one of them is resumed; else S := S+1.

(Dijkstra, 1968)

V() in RK0 semaphores maps to post() and P() to pend().

6.1.1. Counting Semaphore and Binary Semaphores

The typical use case for semaphores is as a "credit tracker": use pend() to consume a credit and post() to return a credit (for example, free slots in a queue). These are Counting Semaphores.

A Binary Semaphore is a counting semaphore with maximum value 1: the state is either available or unavailable.

Binary semaphores are often used for task-to-task or ISR-to-task synchronisation, and sometimes for mutual exclusion (with caveats discussed later).

6.1.2. Semaphores in RK0

To initialise a semaphore in RK0, provide two values: initial count and maximum count. When the counter is at maximum, post() does not increment it and returns RK_ERR_SEMA_FULL.

A Binary Semaphore is therefore created by setting maximum count to 1. A counting semaphore that is intended never to saturate can use UINT32_MAX.

Besides init(), post(), and pend(), query() inspects current state: non-negative means current count; negative means number of tasks waiting.

The operation for flushing a semaphore (waking all pending tasks) was deprecated on V0.17.0. Now only Sleep Queues have wake/flush().

6.1.2.1. Producer-consumer problem with Binary and Counting Semaphores

Items are buffered within a memory region whose capacity is K items.

Thus: 0 < (Number of Inserted) – (Number of Extracted) < K.

Using semaphores the pattern is as follows:

  1. A semaphore with K tokens to track free slots, preventing producers from proceeding when there are no free slots.

  2. Another semaphore, with K tokens, for the number of items, not allowing consumers to proceed if there are no items.

  3. A 1-token semaphore so only one task manipulates the buffer at a time.

Bounded buffer: semaphore roles and critical section
/* a ring buffer of items */
#define BUFSIZ (K)
static ITEM_t buf[BUFSIZ]={0};
static UINT getIdx = 0U;
static UINT putIdx = 0U;
/* this indexes==0 could either mean FULL or EMPTY for a regular
circular buffer with wrap-around.
With semaphores the state is well defined.
*/

RK_SEMAPHORE  itemSema;
RK_SEMAPHORE  slotSema;
RK_SEMAPHORE  acquireSema;


VOID kApplicationInit(VOID)
{

  /*buffer is initialised empty */
    kSemaphoreInit
    (   &itemSema,
        0,   /* no item  */
        K    /*max items */
    );

    kSemaphoreInit
    (   &slotSema,
        K, /* K free slots */
        K  /* max slots */
    );

    /* and free */
    kSemaphoreInit
    (   &acquireSema,
        1, /* free to access */
        1  /* 1 max task allowed */
    );


VOID PutItem(ITEM_t* insertItemPtr)
{
    /* wait for room */
    kSemaphorePend(&slotSema, RK_WAIT_FOREVER);

    /* wait for availability */

    kSemaphorePend(&acquireSema,  RK_WAIT_FOREVER);
    buf[putIdx] = *insertItemPtr;
    putIdx += 1U; putIdx %= BUFSIZ;
    /* signal availability */
    kSemaphorePost(&acquireSema);

    /* signal item */
    kSemaphorePost(&itemSema);
}


 VOID GetItem(ITEM_t* extractItemPtr )
{

    /* wait for an item */
    kSemaphorePend(&item, RK_WAIT_FOREVER);

    /* wait for availability */
    kSemaphorePend(&acquireSema,  RK_WAIT_FOREVER);

    *extractItemPtr = buf[getIdx];
    getIdx+=1U; getIdx %= BUFSIZ;

    /* signal availability */
    kSemaphorePost(&acquireSema);

    /* signal room */
    kSemaphorePost(&slotSema);
}

The solution above has Put() and Get() as blocking methods.

If the producer and the consumer run at different rates, eventually, they will synchronise to the lowest rate.

The numbers below are from a run with a buffer of 32 items (integers being incremented are the produced data).

The producer is twice faster than the consumer. Initially at every 2 insertions there is a single remove.

Put 59 <-
Put 60 <-
------
Got 30 ->
------
Put 61 <-
Put 62 <-
------
Got 31 ->
------
Put 63 <-
Put 64 <-
--------
Got 32  | ->
Put 65  . <-
       <x>[Full Queue, Producer blocks]
Got 33  | ->  [Consumer unblocks producer...]
Put 66  . <-
       <x>[Full Queue]
Got 34  | ->  [Consumer unblocks producer...]
Put 67  . <-
       <x>[Full Queue]
When to tasks at different rates insert and remove from the same buffer, and both operations are blocking, eventually they will run at the pace of the lowest task.

6.2. Sleep Queue

Sleep Queue Control Block

Task Waiting Queue

An RK_SLEEP_QUEUE is stateless: it means that unlike a wait() on a Semaphore, it is not testing to decide whether or not blocking; it is explicitly s switching the task to SLEEPING, until it is signalled by another task that uses a signal\wake() on that sleep queue.

A signal() wakes the task with the higher priority that has been enqueued first. `

Different from Semaphores, Sleep Queues support waking several tasks at once via wake(n) — a broadcast.

A wake(n) wakes at most n tasks if any. This provides some control over the the always questionable overhead of broadcast signals. If n=0 m it will flush.

If broadcasting from an ISR, the operation is deferred for the PostProcessingTask.

A query() returns the number of waiting tasks.

A suspend() moves a READY task to a sleep queue, and its state switches SLEEPING_SUSPENDED. This is done to prevent a task from being scheduled, so

Tasks in other states are not allowed as target. A ready() does the opposite, moving a specific task from that sleep queue to READY.

Sleep Queue names usually reflect the condition they need or the action a sensed wake/signal event triggers. (for example, notFull, notEmpty, goWriters, goReaders)

6.3. Task Events (Event Flags)

Within Task Control Block

Event Register Value (ULONG)

Required Events (ULONG)

Satisfy Condition (ALL/ANY)

Each Task Control Block stores a 32-bit event register (one event per bit).

They are to be seen as an array of private binary semaphores. The mechanism is quite simple:

  • An eventSet(taskHandle, mask) will perform a bitwise-OR on current Event Register. If the task is already blocked waiting for a set and it satisfies the conditions it will switch to READY.

  • An eventGet(requiredBits, ALL/ANY, storeAddr, timeout) will check if ALL or ANY of the required bits are present. If not, the task can block or return immediately if using RK_NO_WAIT. If they are present, the requiredBits are always cleared. Before being cleared, if storeAddr is not NULL the current Event Register value is stored on that address for inspection. This is useful when requiring ANY as a condition so one checks which of those were present and acts accordingly.

  • An eventClear(taskHandle, mask) will clear the bits marked as 1 in mask.

  • An eventQuery(taskHandle, storeAddr) will inspect the current status of the event register on a task. In both Clear and Query if taskHandle is NULL, the API considers the caller as the target task handle.

For convenience there are macros encoding the bit position as 32-bit numbers. e.g., RK_EVENT_1 equals 0x00000001, …​, RK_EVENT_6 equals 0x00000020; RK_EVENT_32` equals 0x80000000 and RK_ALL_EVENTS equals 0xFFFFFFFF.

6.4. Mutex Lock

Mutex Control Block

Locked State (Boolean)

Owner

Protocol Flag (RK_NO_INHERIT / RK_INHERIT)

Waiting Queue

Mutex Node (list node within the owner TCB)

Some regions are critical and must not be executed by more than one task at once. Acquiring (lock()) a mutex before entering and releasing (unlock()) after leaving makes the region mutually exclusive.

A Mutex is a binary semaphore with ownership: once a task locks a mutex only that task can unlock it.

If a task tries to acquire a locked mutex, it switches to BLOCKED until the owner unlocks it. When released, the highest-priority waiter is dequeued first. Unlike semaphores, unlocking by a non-owner is invalid and rejected.

Mutexes are only for mutual exclusion; they are not signalling primitives.

PS: RK0 mutexes are non-recursive. Re-entrant locking of the same mutex returns RK_ERR_MUTEX_REC_LOCK and is considered a fault.


6.4.1. Priority Inversion and PIP

Let TH, TM, and TL be three tasks with priority high (H), medium (M) and low (L), respectively. Say TH is dispatched and blocks on a mutex that 'TL' has acquired (i.e.: "TL is blocking TH").

If 'TM' does not need the resource, it will run and preempt 'TL'. And, by transition, 'TH'.

From now on, 'TH' has an unbounded waiting time because any task with priority higher than 'L' that does not need the resource indirectly prevents it from being unblocked — awful.

The Priority Inheritance (PI) Protocol avoids this unbounded waiting. It is characterised by an invariant, simply put:

PIP Invariant: At any instant a Task assumes the highest priority among the tasks it is blocking.

If employed on the situation described above, task TM cannot preempt TL, whose effective priority would have been raised to 'H'.

It is straightforward to reason about this when you consider the scenario of a single mutex.

When locks nest, the protocol also needs to be:

  • Transitive: if T1 blocks T2 and T2 blocks T3, the highest priority (T3) must propagate back to T1 through T2.

This is the hard part of a correct implementation: updates must preserve the invariant across changing wait chains and multiple mutexes.

This blog shows an even more intricate case of priority inversion handling.

Below, a PIP-only case in which locks nest:

/* Task1 has the Highest nominal priority */
/* Task2 has the Medium nominal priority */
/* Task3 has Lowest nominal priority */

/* Note Task3 starts as 1 and 2 are delayed */

RK_DECLARE_TASK(task1Handle, Task1, stack1, STACKSIZE)
RK_DECLARE_TASK(task2Handle, Task2, stack2, STACKSIZE)
RK_DECLARE_TASK(task3Handle, Task3, stack3, STACKSIZE)


RK_MUTEX mutexA;
RK_MUTEX mutexB;

VOID kApplicationInit(VOID)
{
	K_ASSERT(!kCreateTask(&task1Handle, Task1, RK_NO_ARGS, "Task1", stack1, \
		STACKSIZE, 1, RK_PREEMPT));
	K_ASSERT(!kCreateTask(&task2Handle, Task2, RK_NO_ARGS, "Task2", stack2, \
		STACKSIZE, 2, RK_PREEMPT));
	K_ASSERT(!kCreateTask(&task3Handle, Task3, RK_NO_ARGS, "Task3", stack3, \
		STACKSIZE, 3, RK_PREEMPT));

/* mutexes initialised with priority inheritance enabled */
	kMutexInit(&mutexA, RK_INHERIT);
	kMutexInit(&mutexB, RK_INHERIT);
}



VOID Task3(VOID *args)
{
	RK_UNUSEARGS
	while (1)
	{
		printf("@ %lums: [TL] Attempting to LOCK 'A' | Eff: %d | Nom: %d\r\n", kTickGet(),
			   runPtr->priority, runPtr->prioNominal);

		kMutexLock(&mutexA, RK_WAIT_FOREVER);

		printf("@ %lums: [TL] LOCKED 'A' (in CS) | Eff: %d | Nom: %d\r\n", kTickGet(),
			   runPtr->priority, runPtr->prioNominal);

		kDelay(60); /* <-- important */

		printf("@%lums: [TL] About to UNLOCK 'A' | Eff: %d | Nom: %d\r\n", kTickGet(),
			   runPtr->priority, runPtr->prioNominal);

		kMutexUnlock(&mutexA);

		printf("--->");
		printf("@%lums: [TL] Exit CS | Eff: %d | Nom: %d\r\n", kTickGet(),
			   runPtr->priority, runPtr->prioNominal);

		kSleep(4);
	}
}

VOID Task2(VOID *args)
{
	RK_UNUSEARGS
	while (1)
	{
		kSleep(5);

		printf("@%lums: [TM] Attempting to LOCK 'B' | Eff: %d | Nom: %d\r\n", kTickGet(),
			   runPtr->priority, runPtr->prioNominal);
		kMutexLock(&mutexB, RK_WAIT_FOREVER);

		printf("@%lums: [TM] LOCKED 'B', now trying to LOCK 'A' | Eff: %d | Nom: %d\r\n",
			   kTickGet(), runPtr->priority, runPtr->prioNominal);
		kMutexLock(&mutexA, RK_WAIT_FOREVER);

		printf("@%lums: [TM] LOCKED 'A' (in CS) | Eff: %d | Nom: %d\r\n", kTickGet(),
			   runPtr->priority, runPtr->prioNominal);
		kMutexUnlock(&mutexA);

		printf("@%lums: [TM] UNLOCKING 'B' | Eff: %d | Nom: %d\r\n", kTickGet(),
			   runPtr->priority, runPtr->prioNominal);

		kMutexUnlock(&mutexB);

		printf("--->");

		printf("@%lums: [TM] Exit CS | Eff: %d | Nom: %d\r\n", kTickGet(),
			   runPtr->priority, runPtr->prioNominal);
	}
}

VOID Task1(VOID *args)
{
	RK_UNUSEARGS
	while (1)
	{
		kSleep(2);

		printf("@%lums: [TH] Attempting to LOCK 'B'| Eff: %d | Nom: %d\r\n", kTickGet(),
			   runPtr->priority, runPtr->prioNominal);

		kMutexLock(&mutexB, RK_WAIT_FOREVER);

		printf("@%lums: [TH] LOCKED 'B' (in CS)  | Eff: %d | Nom: %d\r\n", kTickGet(),
			   runPtr->priority, runPtr->prioNominal);

		kMutexUnlock(&mutexB);

		printf("--->");

		printf("@%lums: [TH] Exit CS | Eff: %d | Nom: %d\r\n", kTickGet(),
			   runPtr->priority, runPtr->prioNominal);
	}
}

Result and comments:

>>>> TL locks 'A'. Higher priority tasks are sleeping. <<<<

@ 14720ms: [TL] Attempting to LOCK 'A' | Eff: 3 | Nom: 3
@ 14720ms: [TL] LOCKED 'A' (in CS) | Eff: 3 | Nom: 3

@14721ms: [TM] Attempting to LOCK 'B' | Eff: 2 | Nom: 2

>>>> TM acquires 'B' and is blocked by TL on 'A'. TL inherits TM's  priority. <<<<

@14721ms: [TM] LOCKED 'B', now trying to LOCK 'A' | Eff: 2 | Nom: 2

>>>> TH will blocked by TM on 'B': <<<<

@14722ms: [TH] Attempting to LOCK 'B'| Eff: 1 | Nom: 1

>>>> TM inherits TH's priority. TL inherits TH's priority via TM. <<<<

@14780ms: [TL] About to UNLOCK 'A' | Eff: 1 | Nom: 3

>>>> Upon unlocking 'A', TL is preempted by TM. It means TL's priority has been restored, as it is no longer blocking a higher priority task. <<<<


>>>> Now TM acquires 'A' <<<<

@14780ms: [TM] LOCKED 'A' (in CS) | Eff: 1 | Nom: 2

>>>> After releasing 'A', but before releasing 'B', TM's priority is still '1', as it is blocking TH while holding 'B'. <<<<

@14780ms: [TM] UNLOCKING 'B' | Eff: 1 | Nom: 2

>>>> Upon unlocking 'B' TM is preempted by TH. (TM's priority has been restored.) <<<<

@14780ms: [TH] LOCKED 'B' (in CS)  | Eff: 1 | Nom: 1

>>> RESULT: even though priority inversion was enforced, tasks leave the nested lock ordered by their nominal priority. <<<

--->@14780ms: [TH] Exit CS | Eff: 1 | Nom: 1
--->@14780ms: [TM] Exit CS | Eff: 2 | Nom: 2
--->@14780ms: [TL] Exit CS | Eff: 3 | Nom: 3

Importantly, the worst-case time is bounded by the time the lowest priority task holds a lock (60 ms in the example: 14720ms → 14780ms).

As for each priority update we check each waiting queue for each mutex a task owns, t he time-complexity is linear O(owner*mutex). But, typically no task ever holds more than a few mutexes. Yet, one should not be encouraged to nest locks if not needed.

6.4.2. Mutexes vs Binary Semaphores

There is (or used to be) a lot of fuss about whether binary semaphores are appropriate to use as locks. As a practical guideline, if all tasks sharing the resource have the same priority, using a binary semaphore can be appropriate — because a binary semaphore is considerably faster. It all depends on the case.

The drawback is the lack of ownership: any task can accidentally release the resource. On a large codebase, this can become a real problem. Nonetheless, this is a problem for semaphores in general.

For tasks with different priorities, binary semaphores should never be considered for mutual exclusion unless priority inversion is not a problem (how?).

Counting semaphores initialised as 1 is too risky. Besides the priority inversion, if the count ever increases above 1, mutual exclusion is lost, and multiple tasks can enter the critical section at once.

6.5. Scheduler Lock

Often, we need a task to perform operations without being preempted. A mutex serialises access to a code region but does not prevent a task from being preempted while operating on data. Depending on the case, this can lead to inconsistent data state.

An aggressive way is to disable interrupts globally. For kernel services often it is the only way to keep data integrity. On the higher level it is feasible for very short operations and/or when you need to protect data from interrupts altogether.

A less aggressive approach is to make the task non-preemptible with kSchLock() before entering the critical region and kSchUnlock() when leaving. This way, interrupts are still being sensed, and even higher-priority tasks might switch to a ready state, but the running thread will not be preempted.

The priority inversion it potentially causes is bounded. If a higher-priority task is readied while the scheduler is locked, the context switch happens immediately after unlocking.

Note that for locking/unlocking the scheduler the global interrupts will be disabled for the time to increment/decrement a counter, therefore, if your atomic operation is as short as that (3 to 4 cycles), disabling/enabling global interrupts is a better alternative.


To add to the discussion, when two threads need to access the same data to 'read-modify-write', a lock-free mechanism is the LDREX/STREX operations of ARMv7M (or more generally C11 atomics). They do not avoid preemptions, and particularly in ARMv7m, if the data is touched by an ISR before the store-exclusive concludes, the ownership is lost. Typically used for multi-core spin-locking.


6.6. Conditional Waiting

Task Events and semaphores work by atomically updating state and testing predicates that control execution flow (for example, pend on a semaphore with count 0 blocks the caller).

A critical region guarded by a lock is either free or taken. What if we need to wait on a richer condition? We express this condition on a shared variable and check for it within a critical region. If the condition is not satisfied, we need to block until it is, but we need to release the lock before sleeping, otherwise, the task that could change the condition and wake us up would be blocked by the lock we are holding. This is done by making the sleeping task release the lock before sleeping on an atomic operation.

If we create a data structure with state variables, the Mutex lock, and the Sleep Queues associated to each condition, plus a set of operations acting over this structure, we have an ADT. This ADT is called a Monitor.

6.6.1. Monitor Invariants

A Monitor needs to respect two invariants:

  1. a single task can be active within a monitor;

  2. only the active task within a monitor can check or change its state.

Given the above invariants, how to keep the a single active task within a monitor if the active task is the one who is waking other tasks?

This comes down to the Signalling Discipline.

6.6.2. Signalling Discipline

At any moment a single task can be active within a monitor. When the sleeping task is signalled, there are 3 common disciplines to follow: signal-and-leave (Hansen), signal-and-wait (Hoare) or signal-and-continue (Mesa).

Arguably, the most common is signal-and-continue — rather than leaving or suspending itself the active task might continue within the monitor. That is possible if the active task holds a lock the waking task needs to acquire to enter. Upon leaving, the active task must release the lock.

The major implication is that by the time the woken task enters the monitor, the condition it was waiting for might no longer be true. It sounds odd because a Monitor is about encapsulating a conditional critical region so no outsiders change its state. But, either a flush, a bad design — or a preemption anomaly — can violate that somehow.

Mesa Monitor has a typical test-loop pattern:

 --- snippet ---
 while (condition is FALSE)
 {
    /*unlock-wait sequence:*/

     ATOMIC_BEGIN

     unlock(mutex);   /*the atomic unlock-sleep we referred earlier */
     sleep(condition)

     ATOMIC_END

     lock(mutex);
     /* when waking, the while clause is tested again */
 }
 --- snippet ---

The figure below shows a producer-consumer problem if implemented on a Monitor-idiom.

                            Entry Queue [][][]–>
         .............................................
         -------------------
         | Internal States |
         |-----------------
         | Total buffers   |
         | Taken buffers   |
         -------------------  |[Active Task]|
         | Sleep Queues    |
         |Sleeping for Data|
         |Sleeping for Room|
         -------------------
                              -> [][][] Exit Queue
        ...............................................

6.6.3. Condition Variable Model

The Condition Variable Model allows a task to wait within a monitor-construct and if active, operate using signal(), wait() and broadcast() respecting the monitor invariant.

Sleep Queues are actually the canonical Condition Variable introduced by Hoare:

Note that a condition "variable" is neither true nor false; indeed, it does not have any stored value accessible to the program. In practice, a condition variable will be represented by an (initially empty) queue of processes which are currently waiting on the condition; but this queue is invisible both to waiters and signallers. This design of the condition variable has been deliberately kept as primitive and rudimentary as possible (…​)

(Monitors: An Operating System Structuring Concept, Hoare, 1974)

RK0 follows pthreads condition variable semantics, aligned with Mesa monitors.

The difference is that there is no standalone CondVar primitive. The programmer combines Sleep Queues and Mutexes and uses these helpers:

  • kCondVarWait(&sleepq, &mutex, timeout)

  • kCondVarSignal(&sleepq)

  • kCondVarBroadcast(&sleepq)

The condWait is the real helper. When using it, a Mesa testing-loop reduces to:

  while(!condition)
  {
    kCondVarWait(&condQueue, &monitorLock, timeout);
  }

If you need a monitor policy different from Mesa, you can build it from the same primitives.

kCondVarWait, kCondVarSignal, and kCondVarBroadcast are task-context APIs and cannot be called from ISRs.
6.6.3.1. Usage Example: Synchronisation Barrier

A given number of tasks must reach a point in the program before all can proceed, so every task calls a barrWait(&barrier) to catch up with the set of tasks it must synchronise.

The last task entering the barrier will broadcast a signal to all tasks waiting for the wake condition.

At any moment within a Monitor a single task is RUNNING (what is an invariant of the kernel), all other tasks within the monitor are either SLEEPING (for some condition) or BLOCKED (on a mutex).

(Interestingly, a barrier is solves the need for mutual coincidence, the very opposite of mutual exclusion.)

/* Synchronisation Barrier */

typedef struct
{
    RK_MUTEX lock;
    RK_SLEEP_QUEUE allSynch;
    UINT count; /* number of tasks in the barrier */
    UINT round; /* increased every time all tasks synch */
    UINT nRequired; /* number of tasks required */
} Barrier_t;

VOID BarrierInit(Barrier_t *const barPtr, UINT nRequired)
{
    kMutexInit(&barPtr->lock, RK_INHERIT);
    kSleepQueueInit(&barPtr->allSynch);
    barPtr->count = 0;
    barPtr->round = 0;
    barPtr->nRequired = nRequired;

}

VOID BarrierWait(Barrier_t *const barPtr)
{
    UINT myRound = 0;
    kMutexLock(&barPtr->lock, RK_WAIT_FOREVER);

    /* save round number */
    myRound = barPtr->round;
    /* increase count on this round */
    barPtr->count++;

    if (barPtr->count == barPtr->nRequired)
    {
        /* reset counter, inc round, broadcast to sleeping tasks */
        barPtr->round++;
        barPtr->count = 0;
        kCondVarBroadcast(&barPtr->allSynch);
    }
    else
    {
        /* sequence: a proper wake signal might happen after inc round */
        while ((UINT)(barPtr->round - myRound) == 0U)
        {
            RK_ERR err = kCondVarWait(&barPtr->allSynch, &barPtr->lock, RK_WAIT_FOREVER);
            K_ASSERT(err==RK_ERR_SUCCESS);
        }
    }
    kMutexUnlock(&barPtr->lock);
}


#define N_REQUIRED 3

Barrier_t syncBarrier;

VOID kApplicationInit(VOID)
{

    K_ASSERT(!kCreateTask(&task1Handle, Task1, RK_NO_ARGS, "Task1", stack1, STACKSIZE, 2, RK_PREEMPT));
    K_ASSERT(!kCreateTask(&task2Handle, Task2, RK_NO_ARGS, "Task2", stack2, STACKSIZE, 3, RK_PREEMPT));
    K_ASSERT(!kCreateTask(&task3Handle, Task3, RK_NO_ARGS, "Task3", stack3, STACKSIZE, 1, RK_PREEMPT));
	BarrierInit(&syncBarrier, N_REQUIRED);
}
VOID Task1(VOID* args)
{
    RK_UNUSEARGS
    while (1)
    {
        kPuts("Task 1 is waiting at the barrier...\r\n");
        BarrierWait(&syncBarrier);
        kPuts("Task 1 passed the barrier!\r\n");
		kSleep(8);
    }
}

VOID Task2(VOID* args)
{
    RK_UNUSEARGS
    while (1)
    {
        kPuts("Task 2 is waiting at the barrier...\r\n");
        BarrierWait(&syncBarrier);
        kPuts("Task 2 passed the barrier!\r\n");
		kSleep(5);
	}
}

VOID Task3(VOID* args)
{
    RK_UNUSEARGS
    while (1)
    {
        kPuts("Task 3 is waiting at the barrier...\r\n");
        BarrierWait(&syncBarrier);
        kPuts("Task 3 passed the barrier!\r\n");
        kSleep(3);
	}
}
syncbarr
6.6.3.2. Usage Example: Readers Writers Lock

Several readers and writers share a piece of memory. Readers can concurrently access the memory to read; a single writer is allowed (otherwise, data would be corrupted).

When a writer finishes, it checks for any readers waiting. If there is, the writer flushes the readers waiting queue. If not, it wakes a single writer, if any. When the last reader finishes, it signals a writer.

Every read or write operation begins with an acquire and finishes with a release.

PS: This RWLock implementation has a reader-preference policy, as when a writer finishes, it flushes sleeping readers. When the last reader finishes, it will signal writer waiting queue.

/* RW-Lock */

/* a single writer is allowed if there are no readers */
/* several readers are allowed if there is no writer*/
typedef struct
{
	RK_MUTEX	 lock;
	RK_SLEEP_QUEUE	 writersGo;
	RK_SLEEP_QUEUE	 readersGo;
	INT			 rwCount; /* number of active readers if > 0 */
						  /* active writer if -1             */

}RwLock_t;

VOID RwLockInit(RwLock_t *const rwLockPtr)
{

	kMutexInit(&rwLockPtr->lock, RK_INHERIT);
	kSleepQueueInit(&rwLockPtr->writersGo);
	kSleepQueueInit(&rwLockPtr->readersGo);
	rwLockPtr->rwCount = 0;
}

/* A writer can acquire if  rwCount = 0 */
/* An active writer is indicated by rwCount = -1; */
VOID RwLockAcquireWrite(RwLock_t *const rwLockPtr)
{
	kMutexLock(&rwLockPtr->lock, RK_WAIT_FOREVER);
	/* if different than 0, there are either writers or readers */
	/* sleep to be signalled */
	while (rwLockPtr->rwCount != 0)
	{
	    kCondVarWait(&rwLockPtr->writersGo, &rwLockPtr->lock, RK_WAIT_FOREVER);
	    /* mutex is locked when waking up*/
	}
	/* woke here, set an active writer */
	rwLockPtr->rwCount = -1;
	kMutexUnlock(&rwLockPtr->lock);
}

/* a writer releases, waking up all waiting readers, if any */
/* if there are no readers, a writer can get in */
VOID RwLockReleaseWrite(RwLock_t *const rwLockPtr)
{
	kMutexLock(&rwLockPtr->lock, RK_WAIT_FOREVER);

	rwLockPtr->rwCount = 0; /* indicate no writers*/

	/* if there are waiting readers, flush */
	ULONG nWaitingReaders=0;
	kSleepQueueQuery(&rwLockPtr->readersGo, &nWaitingReaders);
	if (nWaitingReaders > 0)
	{
	    /* condVarBroadcast is just an alias for an event flush */
		kCondVarBroadcast(&rwLockPtr->readersGo);
	}
	else
	{
		/* wake up a single writer if any */
		kCondVarSignal(&rwLockPtr->writersGo);
	}
	kMutexUnlock(&rwLockPtr->lock);
}

/* a reader can acquire if there are no writers */
VOID RwLockAcquireRead(RwLock_t *const rwLockPtr)
{
	kMutexLock(&rwLockPtr->lock, RK_WAIT_FOREVER);
	/* if there is an active writer, sleep */
	while (rwLockPtr->rwCount < 0)
	{
	    kCondVarWait(&rwLockPtr->readersGo, &rwLockPtr->lock, RK_WAIT_FOREVER);
	    /* mutex is locked when waking up*/
	}
	/* increase rwCount, so its > 0, indicating readers */
	rwLockPtr->rwCount ++;
	kMutexUnlock(&rwLockPtr->lock);
}

/* a reader releases and wakes a single writer */
/* if it is the last reader */
VOID RwLockReleaseRead(RwLock_t *const rwLockPtr)
{
	kMutexLock(&rwLockPtr->lock, RK_WAIT_FOREVER);
	rwLockPtr->rwCount --;
	if (rwLockPtr->rwCount == 0)
	{
		kCondVarSignal(&rwLockPtr->writersGo);
	}
	kMutexUnlock(&rwLockPtr->lock);
}

In the image below, 4 tasks — a fast writer (Task 1), a slow writer (Task 4) and two readers (Task3 is faster than Task2) — reading from and writing to a shared UINT variable:

readerwriter 4

6.7. Task Mail

Within a Task Control Block

Pointer to a message (VOID*)

6.7.1. Description

In RK0 we define Mails as 1-word Messages. A Mail is often a pointer, but not necesssarily.

Every TCB has a built-in Mail slot — which type is VOID*. The choice for VOID* instead of ULONG, is because on a task-to-task communication the type of message is often application-dependent, and we make explicit we are passing by reference.

Still, if passing a message that is simply a number within the range of 32-bits is possible by simply casting to an integer. It will pass by copy in this case.

  • Sender: drop a pointer into the target’s mail; the call never blocks and simply replaces stale mail.

  • Receiver: reads own mail. If empty, block with a suitable timeout or returns unsuccesful. On success the pointer is returned and the slot is cleared.

The slot is either EMPTY (NULL) or FULL (hhasa value).

(One could think that overloading Task Events as a Mail would be a better approach. We understand it breaks semantics.)

6.7.2. Suitable usage patterns

A Task Mail is a single-slot inbox, not a queue. It works best when the receiver only cares about the newest value, or when a higher-level protocol guarantees that only one message can be outstanding at a time.

6.7.2.1. Latest-snapshot handoff

The producer updates a shared snapshot and mails its address to the consumer. If the consumer falls behind, stale snapshots are naturally dropped and only the latest one remains.

struct pos
{
    FLOAT x;
    FLOAT y;
    FLOAT z;
} K_ALIGN(4);

typedef struct pos Position_t;

static volatile Position_t g_lastPos;

/* Producer posts newest sample; never blocks and overwrites if reader is late */
VOID SensorTask(VOID *args)
{
    RK_UNUSEARGS;

    while (1)
    {

        HAL_getPosition((Position_t *)&g_lastPos);
        kMailSend(consumerHandle, (VOID *)&g_lastPos);
        kSleepRelease(SENSOR_PERIOD);
    }
}

/* Consumer blocks for the newest snapshot */
VOID Consumer(VOID *args)
{
    RK_UNUSEARGS;
    Position_t *recvPtr = NULL;
    Position_t  sample;

    while (1)
    {
        if (kMailRecv((VOID **)&recvPtr, RK_WAIT_FOREVER) == RK_ERR_SUCCESS)
        {
            sample = *recvPtr;   /* copy immediately */
            processSample(&sample);
        }
    }
}
6.7.2.2. Latest snapshot with double buffering

This example is more illustrative, although it does reflect a practical issue. If there is a chance an update happens while the consumer is still copying data integrity is compromised. You can use double buffering: producer writes in on slot, hand off, swap. Next level is triple-buffering and eventually ring-buffers or message queues, but that means getting away from fresh data if it matters. This will be discussed later with a solution: Most-Recent Message protocol.

/* shared */
static volatile Position_t isrSample;   /* written only in ISR */
static Position_t buf[2];
static UINT rdIdx = 0, wrIdx = 1;

#define EV_POS_READY RK_EVENT_1

/* ISR: just take the snapshot and notify */
void ISR(void)
{
    HAL_getPosition((Position_t *)&isrSample); /* single writer */
    kEventSet(sensorHandle, EV_POS_READY);
}

/* Sensor task: copy into its double buffer, then mail the stable copy */
void SensorTask(void *args)
{
    RK_UNUSEARGS;
    while (1)
    {
        kEventGet(EV_POS_READY, RK_EVENT_ALL, NULL, RK_WAIT_FOREVER);

        buf[wrIdx] = (Position_t)isrSample; /* full struct copy */

        /* swap */
        rdIdx = wrIdx;
        wrIdx = rdIdx ^ 1U;
        /* send */
        kMailSend(consumerHandle, &buf[rdIdx]);

    }
}

/* Consumer: copy immediately */
VOID ConsumerTask(VOID *args)
{
    RK_UNUSEARGS;
    Position_t *recvPtr = NULL;
    Position_t sample;

    while (1)
    {
        if (kMailRecv((VOID **)&recvPtr, RK_WAIT_FOREVER) == RK_ERR_SUCCESS)
        {
            sample = *recvPtr;
            processSample(&sample);
        }
    }
}

6.8. Message Queue

Message Queue Control Block

Owner Task

Buffer Address

Message Size

Number of Mesages

Write Position

Read Position

Notify callback

Waiting queue

Message Queues (RK_MESG_QUEUE) are public message-passing kernel objects with no ownership. Any task can send to or receive from a queue; access is governed only by the queue state (full/empty) and the chosen blocking policy.

Each message queue has a backing storage holding N messages of a fixed-size S. We say that C = N x S [Words] is the queue capacity.

Message Queues transmit by copy. Sending and receiving are either blocking or non-blocking.

6.8.1. Size of a Message

Each declared queue has a fixed message-size at initialisation, and can assume, 1, 2, 4 or 8 WORDs (4, 8, 16, 32 BYTEs). This constraint is intentional. Word-aligned copies are faster, predictable and safer for type casting.

(A word-aligned single copy will take ~5 cycles in Cortex-M3/4/7, and ~6 cycles on Cortex-M0/M0+.)

6.8.2. Mailboxes

A single-message queue is said to be a Mailbox. It can be of any size supported size (1, 2, 4, 8 words), but often one makes it a 1-word message to pass pointers. When using with blocking send/recv it is useful to synchronise tasks while exchanging information.

The method sendovw() to overwrite the current message works only on single-message queues. In this case, a 1:N model can work well, if the receivers use a peek() to non-destructive read.

6.8.3. Ownership

Ownership can be granted to queues, so a single receiver is enforced. An owner task is assigned with queuesetowner(), and only that task can receive messages from the queue. If a sender blocks on a full owner-bound queue and has higher priority, the owner task can be boosted to prevent priority inversion. Queues that have owners can use most operations pointing directly to the task owner, and a task that owns a queue uses a recv(&recvPtr, timeout) to read from its own message queue.

6.8.4. Notify callback

A callback can be registered for to be trigerred when a queue sends a message successfuly. This is a means of notification. This callback must be short, non-blocking, and normally will be a signalling a semaphore or a setting an event on a task.

6.8.5. Usage Examples

6.8.6. Mail Queue pattern

A mail queue is a very useful pattern for message-passing.

It is done by combining a Memory Partition and 1-word-size message-queue, with a N>1.

Sender allocates, receiver frees the partition. This keep integrity and low overhead as the copy from sender to queue and queue to receiver is reduced to 1-word.

Below, snippets of the Application Logger facility that uses this pattern.

/* Application Logger pattern */

/* standard log structure */
struct log
{
    RK_TICK t; /* timestamp */
    CHAR s[LOGLEN]; /*formatted string */
    UINT    level; /* level 0=message, 1=fault */
} K_ALIGN(4);

typedef struct log Log_t;

/* logger mem allocator + mem pool */
static RK_MEM_PARTITION qMem;
static Log_t logBufPool[LOGPOOLSIZ] K_ALIGN(4);

/* backing buffer for the logger queue */
/* (messages are 1-word-size, number equals the pool) */
RK_DECLARE_MESG_QUEUE_BUF(logQBuf, VOID *, LOGPOOLSIZ)

/* logger mail queue */
static RK_MESG_QUEUE logQ;

/* a sender will allocate a buffer write the log
message and enqueue on the mail queue, not-blocking
if the queue is full it returns the buffer immediately
if the memory pool is empty it drops the operation */

```c
/* excerpt of logPost(...) */
VOID logPost(/*formatted string */)
    ---snip---
    Log_t *logPtr = (Log_t*)kMemPartitionAlloc(&qMem);
    RK_BARRIER
    if (logPtr) /* available buffer */
    {

       < fill the buffer >

        /* enqueue address */
        if (kMesgQueueSend(&logQ, &p, RK_NO_WAIT) != RK_ERR_SUCCESS)
        {
            /* queue is full, deallocate buf */
            RK_ERR err = kMemPartitionFree(&qMem, &p);
            K_ASSERT(err==RK_ERR_SUCCESS);
        }

    }
    ---snip---

/* excerpt of the logger task */

static VOID LoggerTask(VOID *args)
{
    RK_UNUSEARGS
    while (1)
    {

        VOID *recvPtr = NULL;

        /* drain the queue: keep receiving while successful */
        while (kMesgQueueRecv(&logQ, &recvPtr, RK_WAIT_FOREVER) == RK_ERR_SUCCESS)
        {
            < print buffer contents >

            /* deallocate */
            RK_ERR err = kMemPartitionFree(&qMem, recvPtr);
            K_ASSERT(err == RK_ERR_SUCCESS);

        }
    }
}

This is one of the many ways of using the Mail Queue pattern.

The entire implementation can be seen at app\logger.c.

6.8.7. Queue Select using Notify Callback

A task is receiving from many queues and need to know which one has been able to complete.

The notifyCbk(queue*) is executed every time a send is successful. In this case it is using an Event Signal to a task. The Signals Flag indicate the queue number - as a contract - of which queue has completed sends. Note that as sends may coalesce, while a flag caps at 1, the consumer will drain each queue until it is empty, or it is preempted.

/* Many-to-1 queue channels */

/* Consumer Select queue based on its event
flags, that a succesfull send triggers  */

#define LOG_PRIORITY 4 /* keep logger as lowest-priority user task */
#define STACKSIZE 256

#define NQUEUES 3 /* number of queues */
#define QSIZ 8 /* depth of each queue */

#define Q0_FLAG   RK_EVENT_1 // (1<<0)
#define Q1_FLAG   RK_EVENT_2 // (1<<1)
#define Q2_FLAG   RK_EVENT_3 // (1<<2)
#define QFLAGS   (ULONG)(Q0_FLAG | Q1_FLAG | Q2_FLAG)

typedef struct
{
    RK_TASK_HANDLE producer;
    UINT payload;
} MESG_t;

/* Succesful Send callbacks */
/* each callback follows this pattern */
static inline
VOID sendNotify0(RK_MESG_QUEUE *qPtr)
{
    (VOID)qPtr;
    kTaskEventSet(consumerHandle, Q0_FLAG);
    /* Q1 flag for queue1 and so forth */
}

/* each callback in installed using kMesgQueueInstallSendCbk
on kApplicationInit() */

/* helper to send */
static inline
VOID enqueueSample(RK_MESG_QUEUE *qPtr UINT payload)
{
    MESG_t mesg = {
        .payload = payload,
        .producer = RK_RUNNING_HANDLE,
    };
    RK_ERR err = kMesgQueueSend(qPtr, &mesg, RK_WAIT_FOREVER);
    K_ASSERT(err == RK_ERR_SUCCESS);
}


VOID Prod0Task(VOID *args)
{
    RK_UNUSEARGS
    UINT seq = 0U;

    while (1)
    {
        enqueueSample(&queues[0], seq++);
        kSleepRelease(25); /* fast producer */
    }
}

VOID Prod1Task(VOID *args)
{
    UINT seq = 0U;
    RK_UNUSEARGS

    while (1)
    {
        enqueueSample(&queues[1], seq++);

        /* every fourth sample, also tickle the third queue */
        if ((seq & 0x3U) == 0U)
        {
            enqueueSample(&queues[2], seq);
        }

        kSleepRelease(60);
    }
}

/* Consumer listens on all queues, selecting those on its signal flags. */
VOID ConsumerTask(VOID *args)
{
    RK_UNUSEARGS

    MESG_t recv = {0};
    ULONG flags = 0UL;

    while (1)
    {
        flags = 0UL;
        kTaskEventGet(QFLAGS, RK_EVENT_FLAGS_ANY, &flags,
                           RK_WAIT_FOREVER);

        for (UINT i = 0; i < NQUEUES; ++i)
        {
            if (flags & (1UL << i))
            {
                while (kMesgQueueRecv(&queues[i], (VOID*)&recv, RK_NO_WAIT) ==
                       RK_ERR_SUCCESS)
                {
                    logPost("Q%u <- sender=%s payload=%u", i,
                            RK_TASK_NAME(recv.producer), recv.payload);
                }
            }
        }
    }
}
       0 ms :: Q1 <- sender=Prod1 payload=0
     250 ms :: Q0 <- sender=Prod0 payload=1
     500 ms :: Q0 <- sender=Prod0 payload=2
     600 ms :: Q1 <- sender=Prod1 payload=1
     750 ms :: Q0 <- sender=Prod0 payload=3
    1000 ms :: Q0 <- sender=Prod0 payload=4
    1200 ms :: Q1 <- sender=Prod1 payload=2
    1250 ms :: Q0 <- sender=Prod0 payload=5
    1500 ms :: Q0 <- sender=Prod0 payload=6
    1750 ms :: Q0 <- sender=Prod0 payload=7
    1800 ms :: Q1 <- sender=Prod1 payload=3
    1800 ms :: Q2 <- sender=Prod1 payload=4
    2000 ms :: Q0 <- sender=Prod0 payload=8
    2250 ms :: Q0 <- sender=Prod0 payload=9
    2400 ms :: Q1 <- sender=Prod1 payload=4
    2500 ms :: Q0 <- sender=Prod0 payload=10
    2750 ms :: Q0 <- sender=Prod0 payload=11
    3000 ms :: Q0 <- sender=Prod0 payload=12
    3000 ms :: Q1 <- sender=Prod1 payload=5
    3250 ms :: Q0 <- sender=Prod0 payload=13
    3500 ms :: Q0 <- sender=Prod0 payload=14
    3600 ms :: Q1 <- sender=Prod1 payload=6
    3750 ms :: Q0 <- sender=Prod0 payload=15
    4000 ms :: Q0 <- sender=Prod0 payload=16
    4200 ms :: Q1 <- sender=Prod1 payload=7
    4200 ms :: Q2 <- sender=Prod1 payload=8
    4250 ms :: Q0 <- sender=Prod0 payload=17
    4500 ms :: Q0 <- sender=Prod0 payload=18
    4750 ms :: Q0 <- sender=Prod0 payload=19
    4800 ms :: Q1 <- sender=Prod1 payload=8
    5000 ms :: Q0 <- sender=Prod0 payload=20
    5250 ms :: Q0 <- sender=Prod0 payload=21

6.9. Channels (procedure calls)

Channels (RK_CHANNEL) implement client‑blocking procedure calls. Each client enqueues a request pointer and blocks until the server completes that request. Multiple clients can be pending at once. The server processes requests and completes them with kChannelDone().

Each Channel has a request-envelope pool. An envelope carries sender metadata and application pointers for request and response payloads. Requesters blocked in kChannelCall() are tracked by the channel itself and are dequeued and readied when kChannelDone() runs. While servicing a call, the server adopts the client’s priority and restores its nominal priority after done().

Channel Control Block

Ring Buffer (request pointers)

Server Task

Waiting Receivers

Waiting Requesters

Request Pool

A RK_REQ_BUF is a request envelope, which is a structure that contains metadata about the sender and pointers to the request and response payloads. The channel has a pool of these envelopes, and clients must allocate one to make a call.

Request Buffer

Server Task

Pointer to Request Payload

Size of Request Payload

Pointer to Response Buffer

6.9.1. Usage Example: HVAC Control System

/*
 Example: HVAC command protocol over RK_CHANNEL.
 *
 * APDU request frame (client -> server):
 *   INSTRUCTION | PAYLOADSIZE | PAYLOAD | CRC16
 *
 * Response (server -> client):
 *   CRC16
 *
 *
 * Control-system idea:
 * - Multiple clients represent independent control loops (thermostat,
 *   occupancy, air quality).
 * - A single HVAC server task owns the plant state and applies every command.
 * - CHANNEL calls serialise updates, so state transitions are deterministic.
 * - The response CRC is an acknowledgement fingerprint of
 *   {instruction, execution-result, current HVAC state}.
*/

#include <kapi.h>
#include <logger.h>

#define LOG_PRIORITY 5
#define STACKSIZE 256

#define HVAC_CHANNEL_DEPTH 4U /* max number of pending requests */
#define HVAC_APDU_MAX_PAYLOAD 8U /* max payload size in bytes */

/* instructions */
#define HVAC_INS_SET_POWER ((BYTE)0x10U)
#define HVAC_INS_SET_MODE ((BYTE)0x11U)
#define HVAC_INS_SET_TARGET_TEMP ((BYTE)0x12U)
#define HVAC_INS_SET_FAN_PERCENT ((BYTE)0x13U)

/* plant mode */
#define HVAC_MODE_HEAT ((BYTE)0x1U)
#define HVAC_MODE_COOL ((BYTE)0x2U)
#define HVAC_MODE_FAN ((BYTE)0x3U)

/* limits */
#define HVAC_MIN_TEMP_C ((BYTE)16U)
#define HVAC_MAX_TEMP_C ((BYTE)30U)

typedef struct
{
    BYTE instruction;
    BYTE payloadSize;
    BYTE payload[HVAC_APDU_MAX_PAYLOAD];
    USHORT crc;
} HVAC_APDU;

typedef struct
{
    BYTE powerOn;
    BYTE mode;
    BYTE targetTempC;
    BYTE fanPercent;
} HVAC_STATE;

/* control tasks and a single actuator server */
RK_DECLARE_TASK(thermostatHandle, ThermostatTask, thermostatStack, STACKSIZE)
RK_DECLARE_TASK(occupancyHandle, OccupancyTask, occupancyStack, STACKSIZE)
RK_DECLARE_TASK(airQualityHandle, AirQualityTask, airQualityStack, STACKSIZE)
RK_DECLARE_TASK(hvacServerHandle, HvacServerTask, hvacServerStack, STACKSIZE)

/* declare the channel and the buffer to enqueue the requests */
static RK_CHANNEL hvacChannel;
RK_DECLARE_CHANNEL_BUF(hvacChannelBuf, HVAC_CHANNEL_DEPTH)

/* request pool for the channel */
static RK_MEM_PARTITION hvacReqPartition;
static RK_REQ_BUF hvacReqPool[HVAC_CHANNEL_DEPTH] K_ALIGN(4);

/* crc computation */
static USHORT HvacCrc16Ccitt_(BYTE const *const dataPtr, UINT const len)
{
    USHORT crc = (USHORT)0xFFFFU;

    for (UINT i = 0U; i < len; ++i)
    {
        crc ^= (USHORT)((USHORT)dataPtr[i] << 8U);

        for (UINT bit = 0U; bit < 8U; ++bit)
        {
            if ((crc & (USHORT)0x8000U) != 0U)
            {
                crc = (USHORT)((USHORT)(crc << 1U) ^ (USHORT)0x1021U);
            }
            else
            {
                crc = (USHORT)(crc << 1U);
            }
        }
    }

    return (crc);
}

static USHORT HvacBuildApduCrc_(HVAC_APDU const *const apduPtr)
{
    BYTE frame[2U + HVAC_APDU_MAX_PAYLOAD];
    UINT const payloadSize = (UINT)apduPtr->payloadSize;

    K_ASSERT(payloadSize <= HVAC_APDU_MAX_PAYLOAD);

    frame[0] = apduPtr->instruction;
    frame[1] = apduPtr->payloadSize;

    for (UINT i = 0U; i < payloadSize; ++i)
    {
        frame[2U + i] = apduPtr->payload[i];
    }

    return (HvacCrc16Ccitt_(frame, 2U + payloadSize));
}

static USHORT HvacBuildResponseCrc_(BYTE const instruction, RK_BOOL const executed, HVAC_STATE const *const statePtr)
{
    /* Response: CRC over execution status + state. */
    BYTE responseFrame[6U];

    responseFrame[0] = instruction;
    responseFrame[1] = (BYTE)((executed != RK_FALSE) ? 1U : 0U);
    responseFrame[2] = statePtr->powerOn;
    responseFrame[3] = statePtr->mode;
    responseFrame[4] = statePtr->targetTempC;
    responseFrame[5] = statePtr->fanPercent;

    return (HvacCrc16Ccitt_(responseFrame, 6U));
}

static RK_BOOL HvacExecuteInstruction_(HVAC_APDU const *const apduPtr, HVAC_STATE *const statePtr)
{
    /* command dispatcher: validates limits before actuation. */
    UINT const payloadSize = (UINT)apduPtr->payloadSize;

    if (payloadSize > HVAC_APDU_MAX_PAYLOAD)
    {
        return (RK_FALSE);
    }

    switch (apduPtr->instruction)
    {
    case HVAC_INS_SET_POWER:
        if ((payloadSize != 1U) || (apduPtr->payload[0] > 1U))
        {
            return (RK_FALSE);
        }
        statePtr->powerOn = apduPtr->payload[0];
        return (RK_TRUE);

    case HVAC_INS_SET_MODE:
        if (payloadSize != 1U)
        {
            return (RK_FALSE);
        }
        if ((apduPtr->payload[0] != HVAC_MODE_HEAT) &&
            (apduPtr->payload[0] != HVAC_MODE_COOL) &&
            (apduPtr->payload[0] != HVAC_MODE_FAN))
        {
            return (RK_FALSE);
        }
        statePtr->mode = apduPtr->payload[0];
        return (RK_TRUE);

    case HVAC_INS_SET_TARGET_TEMP:
        if (payloadSize != 1U)
        {
            return (RK_FALSE);
        }
        if ((apduPtr->payload[0] < HVAC_MIN_TEMP_C) ||
            (apduPtr->payload[0] > HVAC_MAX_TEMP_C))
        {
            return (RK_FALSE);
        }
        statePtr->targetTempC = apduPtr->payload[0];
        return (RK_TRUE);

    case HVAC_INS_SET_FAN_PERCENT:
        if ((payloadSize != 1U) || (apduPtr->payload[0] > 100U))
        {
            return (RK_FALSE);
        }
        statePtr->fanPercent = apduPtr->payload[0];
        return (RK_TRUE);

    default:
        return (RK_FALSE);
    }
}

static USHORT HvacChannelCall_(BYTE const instruction,
                               BYTE const *const payloadPtr,
                               BYTE const payloadSize)
{
    /* Synchronous command transaction:
     * allocate request envelope, send APDU, block until server completes.
     */
    HVAC_APDU apdu = {0};
    USHORT responseCrc = 0U;

    RK_REQ_BUF *reqBuf = (RK_REQ_BUF*)kMemPartitionAlloc(&hvacReqPartition);
    K_ASSERT(reqBuf != NULL);

    K_ASSERT((UINT)payloadSize <= HVAC_APDU_MAX_PAYLOAD);

    apdu.instruction = instruction;
    apdu.payloadSize = payloadSize;

    for (UINT i = 0U; i < (UINT)payloadSize; ++i)
    {
        apdu.payload[i] = payloadPtr[i];
    }

    apdu.crc = HvacBuildApduCrc_(&apdu);

    reqBuf->size = (ULONG)sizeof(HVAC_APDU);
    reqBuf->reqPtr = &apdu;
    reqBuf->respPtr = &responseCrc;

    RK_ERR err = kChannelCall(hvacServerHandle, reqBuf, RK_WAIT_FOREVER);
    K_ASSERT(err == RK_ERR_SUCCESS);

    return (responseCrc);
}

VOID HvacServerTask(VOID *args)
{
    RK_UNUSEARGS

    /* Single-writer plant model: only this task mutates HVAC_STATE. */
    HVAC_STATE hvacState =
    {
        .powerOn = 0U,
        .mode = HVAC_MODE_COOL,
        .targetTempC = 22U,
        .fanPercent = 40U
    };

    while (1)
    {
        /*
          SERVER EXECUTION FLOW:
          1) Accept command
          2) validate APDU
          3) apply instruction
          4) emit response CRC
          5) complete channel call.
         */
        RK_REQ_BUF *reqBuf = NULL;
        RK_ERR err = kChannelAccept(&hvacChannel, &reqBuf, RK_WAIT_FOREVER);
        K_ASSERT(err == RK_ERR_SUCCESS);

        HVAC_APDU const *apduPtr = (HVAC_APDU const *)reqBuf->reqPtr;
        USHORT *responseCrcPtr = (USHORT *)reqBuf->respPtr;

        K_ASSERT(apduPtr != NULL);
        K_ASSERT(responseCrcPtr != NULL);

        RK_BOOL valid = (RK_BOOL)(reqBuf->size == (ULONG)sizeof(HVAC_APDU));
        RK_BOOL executed = RK_FALSE;

        if (valid != RK_FALSE)
        {
            USHORT expectedCrc = HvacBuildApduCrc_(apduPtr);
            valid = (RK_BOOL)(expectedCrc == apduPtr->crc);
        }

        if (valid != RK_FALSE)
        {
            executed = HvacExecuteInstruction_(apduPtr, &hvacState);
        }

        *responseCrcPtr = HvacBuildResponseCrc_(apduPtr->instruction,
                                                executed,
                                                &hvacState);

        if ((valid != RK_FALSE) && (executed != RK_FALSE))
        {
            logPost("[HVAC-SRV] INS=0x%02x OK PWR=%u MODE=%u T=%uC FAN=%u%% RESP_CRC=0x%04x",
                    (UINT)apduPtr->instruction,
                    (UINT)hvacState.powerOn,
                    (UINT)hvacState.mode,
                    (UINT)hvacState.targetTempC,
                    (UINT)hvacState.fanPercent,
                    (UINT)(*responseCrcPtr));
        }
        else
        {
            logPost("[HVAC-SRV] INS=0x%02x INVALID REQ_CRC=0x%04x RESP_CRC=0x%04x",
                    (UINT)apduPtr->instruction,
                    (UINT)apduPtr->crc,
                    (UINT)(*responseCrcPtr));
        }

        err = kChannelDone(reqBuf);
        K_ASSERT(err == RK_ERR_SUCCESS);
    }
}

VOID ThermostatTask(VOID *args)
{
    RK_UNUSEARGS

    /*  chooses mode and target temperature setpoint. */
    BYTE payload[1U] = {0U};

    while (1)
    {
        payload[0] = HVAC_MODE_HEAT;
        USHORT crcMode = HvacChannelCall_(HVAC_INS_SET_MODE, payload, 1U);
        logPost("[THERMO] SET_MODE=%u RESP_CRC=0x%04x", (UINT)payload[0],
                (UINT)crcMode);

        payload[0] = 24U;
        USHORT crcTemp =
            HvacChannelCall_(HVAC_INS_SET_TARGET_TEMP, payload, 1U);
        logPost("[THERMO] SET_TEMP=%uC RESP_CRC=0x%04x", (UINT)payload[0],
                (UINT)crcTemp);

        kSleep(80U);

        payload[0] = HVAC_MODE_COOL;
        crcMode = HvacChannelCall_(HVAC_INS_SET_MODE, payload, 1U);
        logPost("[THERMO] SET_MODE=%u RESP_CRC=0x%04x", (UINT)payload[0],
                (UINT)crcMode);

        payload[0] = 20U;
        crcTemp = HvacChannelCall_(HVAC_INS_SET_TARGET_TEMP, payload, 1U);
        logPost("[THERMO] SET_TEMP=%uC RESP_CRC=0x%04x", (UINT)payload[0],
                (UINT)crcTemp);

        kSleep(80U);
    }
}

VOID OccupancyTask(VOID *args)
{
    RK_UNUSEARGS

    /* Presence loop: enables/disables HVAC when occupancy changes.
      (number of people on the room)
    */
    BYTE payload[1U] = {1U};

    while (1)
    {
        USHORT crc = HvacChannelCall_(HVAC_INS_SET_POWER, payload, 1U);
        logPost("[OCCUP] SET_POWER=%u RESP_CRC=0x%04x", (UINT)payload[0],
                (UINT)crc);

        payload[0] = (BYTE)((payload[0] == 0U) ? 1U : 0U);
        kSleep(140U);
    }
}

VOID AirQualityTask(VOID *args)
{
    RK_UNUSEARGS

    /* Air-quality loop: adjusts fan throughput demand. */
    BYTE payload[1U] = {30U};

    while (1)
    {
        USHORT crc = HvacChannelCall_(HVAC_INS_SET_FAN_PERCENT, payload, 1U);
        logPost("[AIRQ ] SET_FAN=%u%% RESP_CRC=0x%04x", (UINT)payload[0],
                (UINT)crc);

        payload[0] = (BYTE)(payload[0] + 20U);
        if (payload[0] > 90U)
        {
            payload[0] = 30U;
        }

        kSleep(110U);
    }
}

VOID kApplicationInit(VOID)
{
    RK_ERR err = kCreateTask(&hvacServerHandle, HvacServerTask, RK_NO_ARGS,
                             "HvacSrv", hvacServerStack, STACKSIZE, 4,
                             RK_PREEMPT);
    K_ASSERT(err == RK_ERR_SUCCESS);

    err = kMemPartitionInit(&hvacReqPartition, hvacReqPool, sizeof(RK_REQ_BUF), HVAC_CHANNEL_DEPTH);
    K_ASSERT(err == RK_ERR_SUCCESS);

    err = kChannelInit(&hvacChannel, hvacChannelBuf, HVAC_CHANNEL_DEPTH,hvacServerHandle, &hvacReqPartition);
    K_ASSERT(err == RK_ERR_SUCCESS);

    err = kCreateTask(&thermostatHandle, ThermostatTask, RK_NO_ARGS, "Thermo", thermostatStack, STACKSIZE, 1, RK_PREEMPT);
    K_ASSERT(err == RK_ERR_SUCCESS);

    err = kCreateTask(&occupancyHandle, OccupancyTask, RK_NO_ARGS, "Occpnc", occupancyStack, STACKSIZE, 2, RK_PREEMPT);
    K_ASSERT(err == RK_ERR_SUCCESS);

    err = kCreateTask(&airQualityHandle, AirQualityTask, RK_NO_ARGS, "AirQal", airQualityStack, STACKSIZE, 3, RK_PREEMPT);
    K_ASSERT(err == RK_ERR_SUCCESS);

    logInit(LOG_PRIORITY);
}
       0 ms :: [HVAC-SRV] INS=0x11 OK PWR=0 MODE=1 T=22C FAN=40% RESP_CRC=0xc0ea
       0 ms :: [HVAC-SRV] INS=0x10 OK PWR=1 MODE=1 T=22C FAN=40% RESP_CRC=0xf3fe
       0 ms :: [HVAC-SRV] INS=0x13 OK PWR=1 MODE=1 T=22C FAN=30% RESP_CRC=0x6b8b
       0 ms :: [THERMO] SET_MODE=1 RESP_CRC=0xc0ea
       0 ms :: [OCCUP] SET_POWER=1 RESP_CRC=0xf3fe
       0 ms :: [AIRQ ] SET_FAN=30% RESP_CRC=0x6b8b
       0 ms :: [HVAC-SRV] INS=0x12 OK PWR=1 MODE=1 T=24C FAN=30% RESP_CRC=0x0d24
       0 ms :: [THERMO] SET_TEMP=24C RESP_CRC=0x0d24
     800 ms :: [HVAC-SRV] INS=0x11 OK PWR=1 MODE=2 T=24C FAN=30% RESP_CRC=0x9a94
     800 ms :: [THERMO] SET_MODE=2 RESP_CRC=0x9a94
     800 ms :: [HVAC-SRV] INS=0x12 OK PWR=1 MODE=2 T=20C FAN=30% RESP_CRC=0x1119
     800 ms :: [THERMO] SET_TEMP=20C RESP_CRC=0x1119
    1100 ms :: [HVAC-SRV] INS=0x13 OK PWR=1 MODE=2 T=20C FAN=50% RESP_CRC=0xb157
    1100 ms :: [AIRQ ] SET_FAN=50% RESP_CRC=0xb157
    1400 ms :: [HVAC-SRV] INS=0x10 OK PWR=0 MODE=2 T=20C FAN=50% RESP_CRC=0x0903
    1400 ms :: [OCCUP] SET_POWER=0 RESP_CRC=0x0903
    1600 ms :: [HVAC-SRV] INS=0x11 OK PWR=0 MODE=1 T=20C FAN=50% RESP_CRC=0x15f3
    1600 ms :: [THERMO] SET_MODE=1 RESP_CRC=0x15f3
    1600 ms :: [HVAC-SRV] INS=0x12 OK PWR=0 MODE=1 T=24C FAN=50% RESP_CRC=0x9e7e
    1600 ms :: [THERMO] SET_TEMP=24C RESP_CRC=0x9e7e
    2200 ms :: [HVAC-SRV] INS=0x13 OK PWR=0 MODE=1 T=24C FAN=70% RESP_CRC=0xe5cd
    2200 ms :: [AIRQ ] SET_FAN=70% RESP_CRC=0xe5cd
    2400 ms :: [HVAC-SRV] INS=0x11 OK PWR=0 MODE=2 T=24C FAN=70% RESP_CRC=0x37dd
    2400 ms :: [THERMO] SET_MODE=2 RESP_CRC=0x37dd
    2400 ms :: [HVAC-SRV] INS=0x12 OK PWR=0 MODE=2 T=20C FAN=70% RESP_CRC=0xbc50
    2400 ms :: [THERMO] SET_TEMP=20C RESP_CRC=0xbc50
    2800 ms :: [HVAC-SRV] INS=0x10 OK PWR=1 MODE=2 T=20C FAN=70% RESP_CRC=0x41a4
    2800 ms :: [OCCUP] SET_POWER=1 RESP_CRC=0x41a4
    3200 ms :: [HVAC-SRV] INS=0x11 OK PWR=1 MODE=1 T=20C FAN=70% RESP_CRC=0x5d54
    3200 ms :: [THERMO] SET_MODE=1 RESP_CRC=0x5d54
    3200 ms :: [HVAC-SRV] INS=0x12 OK PWR=1 MODE=1 T=24C FAN=70% RESP_CRC=0xd6d9
    3200 ms :: [THERMO] SET_TEMP=24C RESP_CRC=0xd6d9
    3300 ms :: [HVAC-SRV] INS=0x13 OK PWR=1 MODE=1 T=24C FAN=90% RESP_CRC=0x40c4
    3300 ms :: [AIRQ ] SET_FAN=90% RESP_CRC=0x40c4

6.10. Most-Recent Message Protocol (MRM)

MRM Control Block

MRM Buffer Allocator

Data Buffer Allocator

Current MRM Buffer Address

Data Size (Message Size)

MRM Buffer

Data Buffer Address

Readers Count

Data Buffer

Application-dependent

There is little practical difference between a message that does not arrive and one with no valid (stale) data. But when wrong (or stale) data is processed - e.g., to define a set point on a loop - a system can fail badly.

Design Choice: provide a broadcast asynchronous message-passing scheme that guarantees data freshness and integrity for all readers.

Benefits: The system has a mechanism to meet strict deadlines that cannot be predicted on design time.

Control loops reacting to unpredictable time events—like a robot scanning an environment or a drive-by-wire system—require a different message-passing approach. Readers cannot "look at the past" and cannot block. The most recent data must be delivered lock-free and have guaranteed integrity.

As owner-bound queues, the MRM is a high-level mechanism. It was chosen to be provided as a kernel service, given its distinctive nature and suitability for RK0 applications.

6.10.1. Functional Description

An MRM works as a 1-to-many asynchronous Mailbox - with a lock-free specialisation that enables several readers to get the most recent deposited message with no integrity issues. Whenever a reader reads an MRM buffer, it will find the most recent data transmitted. It can also be seen as an extension of the Double Buffer pattern for a 1:N communication.

The core idea of the MRM protocol is that readers can only access the buffer that is classified as the 'most recent buffer'. After a writer publish() a message, that will be the only message readers can get() — any former message being processed by a reader was grabbed before a new publish() - and, from now on, can only be unget(), eventually returning to the pool.

To clarify further, the communication steps are listed:

  1. A producer first reserves an MRM Buffer - the reserved MRM Buffer is not available for reading until it is published.

  2. A message buffer is allocated and filled, and its address is within an MRM Buffer. The producer publishes the message. From now on, it is the most recent message. Any former published buffer is no longer visible to new readers

  3. A reader starts by getting an MRM Buffer. A get() operation delivers a copy of the message to the reader’s scope. Importantly, this operation increases the number of readers associated to that MRM Buffer.

Before ending its cycle, the task releases (unget()) the buffer; on releasing, the kernel checks if the caller task is the last reader and if the buffer being released is not the current MRM Buffer.

If the above conditions are met, the unget() operation will return the MRM buffer to the pool. If there are more readers, OR if it is the current buffer, it remains available.

When the reserve operation detects that the most recent buffer still has readers, a new buffer is allocated to be written and published. If it has no readers, it is reused.

This way, the worst case is a sequence of publish() with no unget() at all — this would lead to the writer finding no buffer to reserve. This is prevented by making: N Buffers = N tasks + 1.

6.10.1.1. MRM Control Block Configuration

What might lead to some confusion when initialising an MRM Control Block is the need for two different pools:

  • One pool will be the storage for the MRM Buffers, which is the data structure for the mechanism.

  • Another pool is for the actual payload. The messages.

Both pools must have the same number of elements: the number of tasks communicating + 1.

  • The size of the data buffers is application-dependent - and is passed as a number of words. The minimal message size is 32-bit.

  • If using data structures, keep it aligned to 4 to take advantage of the performance of aligned memory.

6.10.1.2. Usage Example

Consider a modern car - speed variations are of interest in many modules. With a somehow "naive" approach, let us consider three modules and how they should react when speed varies:

  1. Cruiser Control: For the Cruiser Control, a speed increase might signify the driver wants manual control back, and it will likely turn off.

  2. Windshield Wipers: If they are on, a speed change can affect the electric motor’s adjustments to the air resistance.

  3. Radio: Speed changes reflect the aerodynamic noise - the radio volume might need adjustment.

As the variations are unpredictable, we need a mechanism to deliver the last speed in order of importance for all these modules. From highest to lowest priority, Cruise, Wipers, and Radio are the three modules that range from safety to comfort.

To emulate this scenario, we can write an application with a higher priority task that sleeps and wakes up at pseudo-random times to produce random values that represent the (unpredictable) speed changes.

The snippet below has 4 periodic tasks. Tasks are periodic using the kSleepRelease() primitive.

There is a sequential counter. When a task wakes and see it was not incremented it just runs.

The producer publishes new data at a random interval, preempting whatever task is running at the moment. Given tha random updates,

typedef struct
{
    UINT speed;
    ULONG timeStamp;
} Mesg_t;

#define STACKSIZE 256
#define N_MRM (5)                          /* Number of MRMs N Tasks + 1 */
#define MRM_MESG_SIZE (sizeof(Mesg_t) / 4) /* In WORDS */
RK_MRM MRMCtl;                             /* MRM control block */
RK_MRM_BUF buf[N_MRM];                     /* MRM pool */
Mesg_t data[N_MRM];                        /* message data pool */

RK_DECLARE_TASK(speedSensorHandle, SpeedSensorTask, stack1, STACKSIZE)
RK_DECLARE_TASK(cruiserHandle, CruiserTask, stack2, STACKSIZE)
RK_DECLARE_TASK(wiperHandle, WiperTask, stack3, STACKSIZE)
RK_DECLARE_TASK(radioHandle, RadioTask, stack4, STACKSIZE)
volatile UINT seq = 0;
VOID kApplicationInit(VOID)
{

    kCreateTask(&speedSensorHandle, SpeedSensorTask, RK_NO_ARGS, "SpeedTsk",
                stack1, STACKSIZE, 1, RK_PREEMPT);

    kCreateTask(&cruiserHandle, CruiserTask, RK_NO_ARGS, "CruiserTsk", stack2,
                STACKSIZE, 2, RK_PREEMPT);

    kCreateTask(&wiperHandle, WiperTask, RK_NO_ARGS, "WiperTsk", stack3,
                STACKSIZE, 3, RK_PREEMPT);

    kCreateTask(&radioHandle, RadioTask, RK_NO_ARGS, "RadioTsk", stack4,
                STACKSIZE, 4, RK_PREEMPT);

    kMRMInit(&MRMCtl, buf, data, N_MRM, MRM_MESG_SIZE);

    logInit(5);
}

VOID SpeedSensorTask(VOID *args)
{
    RK_UNUSEARGS

    Mesg_t sendMesg = {0};
    while (1)
    {
        RK_TICK sleepTicks = ((RK_TICK)rand() % 18) + 1;
        kSleep(sleepTicks);
        RK_TICK currTick = kTickGetMs();
        UINT speedValue = (UINT)(rand() % 170) + 1;
        sendMesg.speed = speedValue;
        sendMesg.timeStamp = currTick;
        /* grab a buffer */
        RK_MRM_BUF *bufPtr = kMRMReserve(&MRMCtl);
        if (bufPtr != NULL)
        {
            K_ASSERT(!kMRMPublish(&MRMCtl, bufPtr, &sendMesg));
            printf("!!!!! @%lums SPEED UPDATE: %u mph\r\n", kTickGetMs(), speedValue);
            seq += 1;
        }
        else
        { /* cannot fail */
            logError("MRM protocol could not find a free buffer\r\n");
        }
        /* publish  */
    }
}

VOID CruiserTask(VOID *args)
{
    RK_UNUSEARGS
    Mesg_t recvMesg = {0};
    while (1)
    {
        RK_MRM_BUF *readBufPtr = kMRMGet(&MRMCtl, &recvMesg);
        if (readBufPtr)
        {
            logPost("CRUISER: (%u mph, %lu ms)", recvMesg.speed, recvMesg.timeStamp);

            kMRMUnget(&MRMCtl, readBufPtr);
        }
        kSleepRelease(4);
    }
}

VOID WiperTask(VOID *args)
{
    RK_UNUSEARGS
    Mesg_t recvMesg = {0};

    while (1)
    {

        RK_MRM_BUF *readBufPtr = kMRMGet(&MRMCtl, &recvMesg);
        if (readBufPtr)
        {
            logPost("WIPERS: (%u mph, %lu ms)", recvMesg.speed, recvMesg.timeStamp);

            kMRMUnget(&MRMCtl, readBufPtr);
        }
        kSleepRelease(7);
    }
}
VOID RadioTask(VOID *args)
{
    RK_UNUSEARGS
    Mesg_t recvMesg = {0};
    while (1)

    {

        RK_MRM_BUF *readBufPtr = kMRMGet(&MRMCtl, &recvMesg);

        if (readBufPtr)
        {
            logPost("RADIO: (%u mph, %lu ms) ", recvMesg.speed, recvMesg.timeStamp);
            kMRMUnget(&MRMCtl, readBufPtr);
        }
        kSleepRelease(11);
    }
}

Thus, different situations can happen:

  • All tasks read the updated pair (speed, time)

  • Not all tasks receive the updated pair because another update happens in between.

  • No tasks receive an update - because another happens too soon.

  • No update happens between in the period of a given task. It receives the same value. No problems.

All these cases are on the log:

Logs show: (last speed record, record time)

  !!!@120ms SPEED UPDATE: 164 mph
     120 ms :: CRUISER: (164 mph, 120 ms)
     140 ms :: WIPERS: (164 mph, 120 ms)
 !!! @150ms SPEED UPDATE: 80 mph
     160 ms :: CRUISER: (80 mph, 150 ms)
     200 ms :: CRUISER: (80 mph, 150 ms)
     210 ms :: WIPERS: (80 mph, 150 ms)
     220 ms :: RADIO: (80 mph, 150 ms)
     240 ms :: CRUISER: (80 mph, 150 ms)
  !!!@280ms SPEED UPDATE: 49 mph
     280 ms :: CRUISER: (49 mph, 280 ms)
     280 ms :: WIPERS: (49 mph, 280 ms)
     320 ms :: CRUISER: (49 mph, 280 ms)
     330 ms :: RADIO: (49 mph, 280 ms)
     350 ms :: WIPERS: (49 mph, 280 ms)
     360 ms :: CRUISER: (49 mph, 280 ms)
     400 ms :: CRUISER: (49 mph, 280 ms)
     420 ms :: WIPERS: (49 mph, 280 ms)
     440 ms :: CRUISER: (49 mph, 280 ms)
     440 ms :: RADIO: (49 mph, 280 ms)
  !!!@450ms SPEED UPDATE: 87 mph
     480 ms :: CRUISER: (87 mph, 450 ms)
     490 ms :: WIPERS: (87 mph, 450 ms)
     520 ms :: CRUISER: (87 mph, 450 ms)
 !!! @540ms SPEED UPDATE: 110 mph
     550 ms :: RADIO: (110 mph, 540 ms)
     560 ms :: CRUISER: (110 mph, 540 ms)
     560 ms :: WIPERS: (110 mph, 540 ms)
     600 ms :: CRUISER: (110 mph, 540 ms)
     630 ms :: WIPERS: (110 mph, 540 ms)
 !!! @640ms SPEED UPDATE: 22 mph
     640 ms :: CRUISER: (22 mph, 640 ms)
     660 ms :: RADIO: (22 mph, 640 ms)

The highlight is that controllers can keep their pace, while receiving fresh data - you can see it on the timestamp on the image.

Again, they might receive the same data more than once or miss samples; what is important is that they are not lagging and consuming stale data.

7. Error Handling

7.1. Fail fast

While tracing and error handling are yet to be largely improved (and that is when the 1.0.0 version will be released), currently RK0 employs a policy of failing fast in debug mode.

When Error Checking is enabled, every kernel call will be 'defensive', checking for correctness of parameters and invariants, null dereferences, etc.

In these cases is more useful to allow the first error to halt the execution by calling an Error Handler function to observe the program state.

A trace structure records the address of the running TCB, its current stack pointer, the link register (that is, the PC at kErrHandler was called), and a time stamp.

This record is on a .noinit RAM section, so it is visible if CPU resets. A fault code is stored in a global faultID and on the trace structure. Developers can hook in custom behaviour.

If the kernel is configured to not halt on a fault, but Error Checking is enabled, functions will return negative values in case of an error.

On the other hand, when Error Checking is disabled or NDEBUG is defined nothing is checked, reducing code size and improving performance.

(Some deeper internal calls have assertion. For those, only NDEBUG defined ensures they are disabled.)

7.2. Stack Overflow

Stack overflow is detected (not prevented) using a "stack painting" with a sentinel word. Stack Overflow detection is enabled by defining the assembler preprocessor __KDEF_STACKOVFLW when compiling.

As a matter of fact, sizing your stack is something you must do diligently when programming a system. I would say a mechanism for stack overflow detection is on the bottom of the list of 'must-have' features.

One can take advantage of the static task model - it is possible to predict offline the deepest call within any task. The compiler flag -fstack-usage creates .su files indicating the depth of every function within a module. This is an example of compilation-unit output:

core/src/ksema.c:34:8:kSemaphoreInit	88	static
core/src/ksema.c:74:8:kSemaphorePend	96	static
core/src/ksema.c:189:8:kSemaphorePost	88	static
core/src/ksema.c:306:5:kSemaphoreQuery	40	static
core/src/kmutex.c:128:8:kMutexInit	16	static
core/src/kmutex.c:165:8:kMutexLock	120	static
core/src/kmutex.c:325:8:kMutexUnlock	96	static
core/src/kmutex.c:425:6:kMutexQuery	56	static

These are the worst cases. Now, you identify the depth of the longest chain of calls for a task using these services and add a generous safety margin — 30%. The cap depends on your budget.

Importantly, you also have to size the System Stack. This initial size is defined in linker.ld by the symbol Min_Stack_Size. In this case, account for the depth of main(), kApplicationInit(), and all interrupt handlers; again, inspect the longest call chain depth. Assume interrupts always add to the worst static depth, and account for nested interrupts.

7.3. Deadlocks

There are deadlock-recovery techniques in the literature, but they are generally unfeasible here. The kernel provides bounded waiting, enforces priority-ordered waiting queues, applies mutex priority inheritance, and offers lock-free primitives and period-drift compensation. None of these techniques prevents deadlocks by itself (and with bounded blocking plus lock-free primitives, one can still get livelocks).

  • Ordered Locking:

For those programming the application, despite following the RMS rule of higher priority for higher request rate tasks, the golden rule for locking is acquiring resources in an unidirectional order throughout the entire application:

acquire(A);
acquire(B);
acquire(C);
   .
   .
   .
release(C);
release(B);
release(A);

This breaks circular waiting.

For instance:

TaskA:
   wait(R1);
   wait(R2);
    /* critical section */
   signal(R2);
   signal(R1);

TaskB:
   wait(R1);
   wait(R2);
    /* critical section */
   signal(R2);
   signal(R1);

But, if:

TaskA:
    wait(R1);
    wait(R2);
    .
    .

TaskB:
    wait(R2);
    wait(R1);
    .
    .

There are some possible outcomes:

  1. Deadlock:

    • TaskA runs: acquires R1

    • TaskB runs: acquires R2

    • TaskA runs: tries to acquire R2 — blocked

    • TaskB runs: tries to acquire R1 — blocked

  2. No deadlock:

    • TaskA runs: acquires R1

    • TaskA runs: acquires R2 (nobody is holding R2)

    • TaskA releases both; TaskB runs and acquires both (in either order)

Overall, there is no deadlock if tasks do not overlap in critical sections. That is why systems run for years without deadlocks and eventually: ploft.

  • Use a 'master-lock' with 'try' semantics

Another technique that can be employed is if one needs to acquire multiple locks—acquire them all or none using a try-lock (RK_NO_WAIT). If any of the tries fail, the task gives up on acquiring the resources and backs off, releasing all successful locks to retry later (most simply, using a sleep queue). That is easier said than done, though, and, as mentioned, if not well done, instead of deadlocks, one gets livelocks.

(Livelocks are when a couple of tasks keep running, but the program does not advance.)

/*pseudo code acquire M1 and M2 or nothing*/

    while (attempts-- > 0u)
    {
        if (lock(&m1, RK_NO_WAIT) != OK)
        {
            /* suspend to try again */
            wait(interval/condition);
            continue;
        }

        if (lock(&m2, RK_NO_WAIT) == OK)
        {
            /*both acquired*/

            return (OK);
        }
        /* failed m2 */

        /* back off m1 */
        unlock(m1);

        /* suspend to try again */
        wait(interval/condition);
    }
    return (ERROR_TIMEOUT);

k0ba logo

© 2026 Antonio Giacomelli | All Rights Reserved | www.kernel0.org