1. THE KERNEL AT A GLANCE
1.1. The design approach
RK0 is the result of independent design, a lot of research plus (a whole other lot of) learning by doing. While I may not have seen every edge case, building it from scratch has given me a unique perspective. I invite peers to review and challenge what’s been done so far. |
In the embedded realm, probably because we lack a better abstraction, we use multithreading to fine-tune our load balance and responsiveness to achieve real-time.
RK0 Blog: About Processes, Tasks and Threads
This is an arrangement: instead of having a single super-loop, we have many, each running on its execution stack.
This arrangement yields an operating system entity to handle—a (logical) Concurrency Unit: in K0, we name it a Task (in RK0 a task is a thread.).
1.1.1. Architecture
If no more details are to be provided, the kernel has a top and a bottom layer. On top of that, the executive manages the resources needed by the application. On the bottom, the Low-level Scheduler works as a software extension of the CPU. Together, they implement the Task abstraction — the Concurrency Unit that enables a multitasking environment.

In systems design jargon, the Executive enforces policy (what should happen). The Low-level Scheduler provides the mechanism (how it gets done). The services are the primitives that gradually translate policy decisions into concrete actions executed by the Scheduler. K0’s goal is determinism on low-end devices.
Its multitasking engine operates without mimics of userland: tasks run in privileged mode on a different stack pointer from the system stack. This trades the already limited 'isolation' its target architecture can provide, for tight and predictable control.
1.1.2. Programming with RK0
As it may become clear throughout the document, you will notice RK0 is built so it does not get in the programmer’s way. Its meant to be transparent, composable, deterministic and with clear semantics.
1.1.3. Suitable Applications
Given the architecture, RK0 targets applications with the following characteristics:
-
They are designed to handle particular devices in which real-time responsiveness is imperative.
-
Applications and middleware may be implemented alongside appropriate drivers.
-
Drivers may even include the application itself.
-
Untested programs are not loaded: After the software has been tested, it can be assumed reliable.
2. Task Scheduler
RK0 employs a Rate Monotonic Scheduler. Tasks are assigned priorities according to their request rates - i.e., tasks with shorter periods are assigned to higher priorities. The highest priority is represented by the value '0'; the lowest is represented by the value '31'.
A scheduler remark is its constant time complexity (O(1)) and low latency. This was achieved by carefully composing the data structures along with an efficient 'choose-next' algorithm. This is detailed below.
Time-slice was deprecated on version 0.5.0. |
2.1. Scheduler Data Structures
2.1.1. Task Control Block
Threads are represented as Tasks. Every task is associated with a Task Control Block structure, which is a record for stack, resources, and time management. The table below partially represents a Task Control Block (as this document is live, this might not reflect the exact fields of the current version).
Task Control Block |
---|
Task name |
Saved Stack Pointer |
Stack Address |
Stack Size |
Status |
Assigned Priority |
Current Priority |
Self-Assigned ID |
Last wake-time |
Run-To-Completion Flag |
Time-out Flag |
List of owned Mutexes |
Aggregated Timeout Node |
Aggregated Task List Node |
Tasks are static - they cannot be created on runtime, to be destroyed, to fork or to join.
In practice, tasks are either RUNNING or 'waiting' for their turn to run.

We need to define WAITING and READY clearly:
-
A READY task will be dispatched; therefore, switch to RUNNING whenever it is the highest priority READY task.
-
A WAITING task depends on a condition, generalised as an event to switch to READY.
-
Logically, the WAITING state will assume different pseudo-states related to the kind of event that will switch a task to READY:
-
SLEEPING: a task suspends itself and goes to sleep for a given period or suspends itself until receiving a wake signal, representing an event.
-
PENDING: the task suspended itself, waiting for a combination of signal flags.
-
BLOCKED: A task is blocked on a mutex or semaphore.
-
SENDING/RECEIVING: A producer task, when blocking on a Message Passing object, switches its status to SENDING, and a consumer to RECEIVING.
-
The scheduler rules, not the heap.
RK0 tasks are static.
It’s a design decision rooted in real-time correctness.
Besides an application-specific system software does not need to treat tasks as 'unknown' objects.
The wins:
-
A memory layout the systems programmer knows.
-
No alignment traps.
-
Link-time visibility:
-
Each task’s stack is a named symbol in the linker map.
-
You can inspect and verify the memory layout before flashing.
-
A simple
objdump
reveals all stack allocations — that’s peace of mind.
-

2.1.2. Task Queues
The backbone of the queues where tasks will wait for their turn to run is a circular doubly linked list: removing any item from a double list takes O(1) (provided we don’t need to search the item). As the kernel knows each task’s address, adding and removing is always O(1). Singly linked lists can’t achieve O(1) for removal.
2.1.3. Ready Queue Table
Another design choice to achieve O(1) is the global ready queue, which is a table of FIFO queues—each queue dedicated to a priority—and not a single ordered queue. So, enqueuing a ready task is always O(1). Given the sorting needed, the time complexity would be O(n) if tasks were placed on a single ready queue.
2.1.4. Waiting Queues
The scheduler does not have a unique waiting queue. Every kernel object that can block a task has an associated waiting queue. Because these queues are a scheduler component, they follow a priority discipline: the highest priority task is dequeued first, always.
When an event capable of switching tasks from WAITING to READY happens, one or more tasks (depending on the mechanism) are then placed on the ready list, unique to their priority. Now, they are waiting to be picked by the scheduler—that is the definition of READY.
2.1.5. The scheduling algorithm
As the ready queue table is indexed by priority - the index 0 points to the queue of ready tasks with priority 0, and so forth, and there are 32 possible priorities - a 32-bit integer can represent the state of the ready queue table. It is a BITMAP:
The BITMAP computation: ((1a) OR (1b)) AND (2), s.t.:
(1a) Every Time a task is readied, update: BITMAP |= (1U << task->priority );
(1b) Every Time an empty READY QUEUE becomes non-empty, update: BITMAP |= (1U << queueIndex)
(2): Every Time READY QUEUE becomes empty, update: BITMAP &= ~(1U << queueIndex);
EXAMPLE:
Ready Queue Index : (6)5 4 3 2 1 0
Not empty : 1 1 1 0 0 1 0
------------->
(LOW) Effective Priority (HIGH)
In this case, the scenario is a system with 7 priority task levels. Queues with priorities 6, 5, 4, and 1 are not empty.
In the RK0 source code, the following routines implement the bitmap update:
/* Enqueue a TCB on on the tail of TCB list */
RK_ERR kTCBQEnq(RK_TCBQ *const kobj, RK_TCB *const tcbPtr)
{
RK_ERR err = kListAddTail(kobj, &(tcbPtr->tcbNode));
if (err == 0)
{
/* if a task was enqueued on a list within the ready queue table, update the 'ready bitmap' */
if (kobj == &readyQueue[tcbPtr->priority])
{
readyQBitMask |= (1 << tcbPtr->priority);
}
}
return (err);
}
/* Add a TCB on the head of the TCB list */
RK_ERR kTCBQJam(RK_TCBQ *const kobj, RK_TCB *const tcbPtr)
{
RK_ERR err = kListAddHead(kobj, &(tcbPtr->tcbNode));
if (err == 0)
{
if (kobj == &readyQueue[tcbPtr->priority])
{
readyQBitMask |= (1 << tcbPtr->priority);
}
}
return (err);
}
/* Dequeue the head task from a list of TCBs */
RK_ERR kTCBQDeq(RK_TCBQ *const kobj, RK_TCB **const tcbPPtr)
{
RK_NODE *dequeuedNodePtr = NULL;
RK_ERR err = kListRemoveHead(kobj, &dequeuedNodePtr);
if (err != RK_SUCCESS)
{
return (err);
}
*tcbPPtr = K_GET_TCB_ADDR(dequeuedNodePtr);
RK_TCB *tcbPtr_ = *tcbPPtr;
RK_PRIO prio_ = tcbPtr_->priority;
/* if the list is in the ready queue table and is now empty
update 'ready bitmap' */
if ((kobj == &readyQueue[prio_]) && (kobj->size == 0))
{
readyQBitMask &= ~(1U << prio_);
}
return (err);
}
/* Remove a specific TCB from a TCB List */
RK_ERR kTCBQRem(RK_TCBQ *const kobj, RK_TCB **const tcbPPtr)
{
RK_NODE *dequeuedNodePtr = &((*tcbPPtr)->tcbNode);
RK_ERR err = kListRemove(kobj, dequeuedNodePtr);
if (err != RK_SUCCESS)
{
return (err);
}
*tcbPPtr = K_GET_TCB_ADDR(dequeuedNodePtr);
RK_TCB *tcbPtr_ = *tcbPPtr;
RK_PRIO prio_ = tcbPtr_->priority;
if ((kobj == &readyQueue[prio_]) && (kobj->size == 0))
{
readyQBitMask &= ~(1U << prio_);
}
return (err);
}
Having the Ready Queue Table bitmap, we find the highest priority non-empty task list as follows:
(1) Isolate the rightmost '1':
RBITMAP = BITMAP & -BITMAP. (- is the bitwise operator for two's complement: ~BITMAP + 1) `
In this case:
[31] [0] : Bit Position
0...1110010 : BITMAP
1...0001110 : -BITMAP
=============
0...0000010 : RBITMAP
[1]
The rationale here is that, for a number N, its 2’s complement -N, flips all bits - except the rightmost '1' (by adding '1') . Then, N & -N results in a word with all 0-bits except for the less significant '1'.
(2) Extract the rightmost '1' position:
-
For ARMv7M, we benefit from the
CLZ
instruction to count the leading zeroes. As they are the number of zeroes on the left of the rightmost bit, '1', this value is subtracted from 31 to find the Ready Queue index.
__RK_INLINE static inline
unsigned __getReadyPrio(unsigned readyQBitmap)
{
unsigned ret;
__ASM volatile (
"clz %0, %1 \n"
"neg %0, %0 \n"
"add %0, %0, #31\n"
: "=&r" (ret)
: "r" (readyQBitmap)
:
);
return (ret);
}
This instruction would return #30, and #31 - #30 = #01 in the example above.
-
For ARMv6M there is no suitable hardware instruction. The algorithm is written in C and counts the trailing zeroes, thus, the index number. Although it might vary depending on your compiler settings, it takes ~11 cycles (note it is still O(1)):
/*
De Brujin's multiply+LUT
(Hacker's Delight book)
*/
/* table is on a ram section for efficiency */
__K_SECTION(getReadyTable)
const static unsigned table[32] =
{
0, 1, 28, 2, 29, 14, 24, 3, 30, 22, 20, 15, 25, 17, 4, 8,
31, 27, 13, 23, 21, 19, 16, 7, 26, 12, 18, 6, 11, 5, 10, 9
};
__RK_INLINE static inline
unsigned __getReadyPrio(unsigned readyQBitmap)
{
unsigned mult = readyQBitmap * 0x077CB531U;
/* Shift right the top 5 bits
*/
unsigned idx = (mult >> 27);
/* LUT */
unsigned ret = (unsigned)table[idx];
return (ret);
}
For the example above, mult = 0x2 * 0x077CB531 = 0x0EF96A62
. The 5 leftmost bits (the index) are 00001
→ table[1] = 1
.
During a context switch, the procedures to find the highest priority non-empty ready queue table index are as follows:
static inline RK_PRIO kCalcNextTaskPrio_(VOID)
{
if (readyQBitMask == 0U)
{
return (idleTaskPrio);
}
readyQRightMask = readyQBitMask & -readyQBitMask;
RK_PRIO prioVal = (RK_PRIO) (__getReadyPrio(readyQRightMask));
return (prioVal);
}
VOID kSchSwtch(VOID)
{
/* O(1) complexity */
nextTaskPrio = kCalcNextTaskPrio_();
RK_TCB* nextRunPtr = NULL;
/* O(1) complexity */
kTCBQDeq(&readyQueue[nextTaskPrio], &nextRunPtr);
runPtr = nextRunPtr;
}
2.2. System Tasks
There are two System Tasks: the Idle Task and the Timer Handler Task.
The Idle Task runs whenever there is no other ready task to be dispatched. The CPU enters on low-power. The kernel assigns the Idle Task priority during initialisation, taking into account all priorities the user has defined. Unless user tasks occupy all 32 priorities, the Idle Task is treated as an ordinary lowest priority and has a position in the ready queue table. Otherwise, it is selected if Ready Queue Bitmap is 0x00000000
.
The Post Processing Task, handles application demands that some services choose to defer. Currently they will dispatch Application Timer Callbacks.
Nominally its priority is 0
, but on practice it could be considered as having priority -1
, because it always takes precedence over other tasks with priority 0
.
2.3. Handling the scheduler
An essential characteristic of the scheduler is that it is a preemptive run-to-completion scheduler. This term, 'run-to-completion' has slightly different meanings depending on the context. It is often related to strictly cooperative schedulers, in the sense tasks must yield the processor. Otherwise, they monopolise the CPU.
In RK0, tasks with the same priority will work cooperatively. This is different from schedulers that employ a time-slice or a quantum for round-robin: after this time expires, task is put at the tail of the Ready Queue.
The term run-to-completion here is to be interpreted as follows:
-
The scheduler’s behaviour is to choose the highest priority READY task to run. Always.
-
The scheduler works on a First-In-First-Out discipline for tasks with the same priority.
-
A task must switch to the READY state before being eligible for scheduling.
-
A task will switch from RUNNING to READY if yielding or if being preempted by a higher priority task. Otherwise it can only go to a WAITING state, and eventually switch back to READY.
-
When a task is preempted by a higher priority task, it switches from RUNNING to READY and is placed back on the head position of its Ready Queue. This means that it will be resumed as soon as it is the highest priority ready task again.
-
On the contrary, if a task yields, it tells the scheduler it has completed its cycle. Then, it will be enqueued on the ready queue tail - the last queue position.
-
When a task waits it is suspended until a condition is satisfied.
-
When the condition is satisfied, it switches from WAITING to READY, and is enqueued on the tail.
-
So, tasks with the same priority cooperate by either yielding or waiting.
-
If a task never yields or waits, other tasks with the same or lower priority will starve.
-
Finally, Tasks with the same priority are initially placed on the Ready Queue associated with that priority in the order they are created.
RK0 can handle context-switching with an extended frame when a float-point co-processor is available. This must be informed when compiling by defining the symbol |
3. Timers and Delays
3.1. Busy-wait delay
A busy-wait delay kBusyWait(t)
keeps a task spinning for t
ticks. That is, the task does nothing but does not suspend or yield (but can be preempted). This service finds its use when simulating workloads.
Context switching is probably the most significant overhead on a kernel. The time spent on the System Tick handler contributes to much of this overhead. Design Choice:
Benefits:
|
Timeout Node |
---|
Timeout Type |
Absolute Interval (Ticks) |
Relative Interval (Ticks) |
Waiting Queue Address |
Next Timeout Node |
Previous Timeout Node |
Every task is prone to events triggered by timers described in this section. Every Task Control Block has a node to a timeout list.
This list is doubly linked, ordered as a delta list. For instance, three timers (T1,8), (T2,6) and (T3,10) will be ordered as a sequence <(T2,6), (T1,2), (T3,2)> - so it counts <6, (6)+2, ((6)+2)+2>.
Thus, for every system tick, only the head element on the list needs to be decreased - yielding O(1) - another design choice towards deterministic behaviour.
3.2. Sleep Timers
A task can be suspended by an amount of time in ticks, in two distinct manners:
3.2.1. Sleep Delay
The task sleeps for the exact number of t
ticks on every call. Time elapsed between calls is not considered.
Example:
VOID Task1(VOID* args)
{
RK_UNUSEARGS
UINT count = 0;
while (1)
{
logPost("Task1: sleep");
kSleep(300);
/* wake here */
count += 1U;
if (count >= 5)
{
kBusyWait(25); /* spin */
count=0;
/* every 5 activations there will be a drift */
}
}
}
Output:
0 ms :: Task1: sleep
300 ms :: Task1: sleep <-- +300
600 ms :: Task1: sleep <-- +300
900 ms :: Task1: sleep <-- +300
1200 ms :: Task1: sleep <-- +300
1525 ms :: Task1: sleep <-- +325
1825 ms :: Task1: sleep <-- +300
2125 ms :: Task1: sleep <-- +300
2425 ms :: Task1: sleep
2725 ms :: Task1: sleep
3050 ms :: Task1: sleep
3350 ms :: Task1: sleep
3650 ms :: Task1: sleep
3950 ms :: Task1: sleep
4250 ms :: Task1: sleep
4575 ms :: Task1: sleep
3.2.2. Periodic Sleep
This primitive is intended to create periodic activations. The period P
ticks is defined at the first kernel call sleepperiod(P)
, and adjusted internally on subsequent activations, as follows:
Say a task is expected to return from its keth
sleep at Tk+1 = Tk + P [ticks]
. If the task is resumed at Tk+1 = Tk + P + N
, upon detecting this drift, the kernel sets: (Tk+2 = Tk+1 + P - N)
.
This can be rewritten as:
(Tk+2 = Tk + P + N + P - N) ←→
(Tk+2 - Tk = 2P.)
So, compensation follows the Law: given two activations Tm
,Tn, m > n
, their difference is Phase = P(n-m) [ticks]
, where P
is the period set at the first call.
Example:
VOID Task1(VOID* args)
{
RK_UNUSEARGS
UINT count = 0;
while (1)
{
logPost("Task1: sleep periodic");
kSleepPeriodic(300);
/* wake here */
count += 1U;
if (count >= 5)
{
kBusyWait(25); /* spin */
count=0;
/* every 5 activations there will be a drift */
}
}
}
Output:
.
.
1200 ms :: Task1: sleep periodic (4P)
1525 ms :: Task1: sleep periodic (> 5P)
1800 ms :: Task1: sleep periodic (6P)
2100 ms :: Task1: sleep periodic (7P)
2400 ms :: Task1: sleep periodic (8P)
2700 ms :: Task1: sleep periodic
3025 ms :: Task1: sleep periodic (>10P)
3300 ms :: Task1: sleep periodic (11P)
3600 ms :: Task1: sleep periodic
3900 ms :: Task1: sleep periodic
4200 ms :: Task1: sleep periodic
4525 ms :: Task1: sleep periodic (> 15P)
4800 ms :: Task1: sleep periodic (16P)
5100 ms :: Task1: sleep periodic
.
.
.
3.3. Blocking Time-out
These are timers associated with kernel calls that are blocking. Thus, establishing an upper-bound waiting time might benefit them. When the time for unblocking is up, the kernel call returns, indicating a timeout. This value is passed as a number of ticks.
When blocking is associated with a kernel object (other than the Task Control Block), the timeout node will store the object waiting for queue’s address, so it can be removed if time expires.
A kernel call is made non-blocking, that is try semantics, by assigning the value RK_NO_WAIT
, the function returns immediately if unsuccessful.
The value RK_WAIT_FOREVER
suspends a task indefinitely until the condition is satisfied.
In practice, we often block either using RK_WAIT_FOREVER
or do not block (try semantics, RK_NO_WAIT
).
Use a bounded timeout only when you expect occasional misses and you know how to handle them. If a blocking call times out and no recovery plan is feasible, it is as a system fault (on constrained devices this is usually unrecoverable at runtime; a watchdog is what is left).
Importantly, an ISR shall never blocks. Indeed, any blocking call from an ISR will hard fault if error checking is enabled. |
3.4. Callout Timers (Application Timers)
Timer Control Block |
---|
Option: Reload/One-Shot |
Phase (Initial Delay) |
Callout Function Pointer |
Callout Argument |
Timeout Node |
These are Application Timers that will issue a callback when expiring. In addition to a callout function, an Application Timer receives an initial phase delay and a period and can choose to run once (one-shot) or auto-reload itself.
The callback runs within a System Task with priority 0 and is non-preemptible, which makes the scheduler prioritise it over other tasks. Callouts must be short and unblocking, as they can cause high CPU contention.
For clarity, Timer Callouts are on a separate list in the kernel, although they share the same TIMEOUT
node.
Application Timers (with autoreload) will keep track of delays in between activations, to preserve phase accross calls as in Periodic Sleep |
Usage example of Callout Timers are found throughout this docbook.
3.5. System Tick
A dedicated peripheral that generates an interrupt after a defined period provides the kernel time reference. For ARMv6/7M, this peripheral is the built-in SysTick, a 24-bit counter timer. The handler performs some housekeeping on every tick and assesses the need to call a context switch.
The 'housekeeping' accounts for global timer tracking and any tick-dependent condition that might change a task status.
When a timer expires, it might switch a task from WAITING
to READY
or dispatch a callback. In the case of a callback, this will also trigger a context-switching for the TimerHandler System Task in which the callback is executed and the related timer(s) are appropriately updated.
Note that tasks might switch from WAITING
to READY
for reasons other than tick-related. In these cases, context switching might be triggered immediately if the readied task can preempt the running task.
4. Memory Allocator
Memory Allocator Control Block |
---|
Associated Block Pool |
Number of Blocks |
Block Size |
Number of Free Blocks |
Free Block List |
The standard C library malloc()
leads to fragmentation and (also, because of that) is highly indeterministic. Unless we use it once - to allocate memory before starting up, it doesn’t fit. But often, we need to 'multiplex' memory amongst tasks over time, that is, to dynamically allocate and deallocate.
To avoid fragmentation, we use fixed-size memory blocks. A simple approach would be a static table marking each block as free or taken. With this pattern, you will need to 'search' for the next available block, if any - the time for searching changes - bounding this search to a maximum number of blocks, or O(n). To optimise, an approach is to keep track of what is free using a dynamic table—a linked list of addresses. Now we have O(1).
We use "meta-data" to initialise the linked list. Every address holds the "next" address value. All addresses are within the range of a pool of fixed-size blocks. This approach limits the minimal size of a block to the size of a memory address—32 bits for our supported architecture.
Yet, this is the cheapest way to store meta-data. If not stored on the empty address itself, an extra 32-bit variable would be needed for each block, so it could have a size of less than 32 bits.
Allocating memory at runtime is a major source of latency (1), indeterministic (2) behaviour, and footprint overhead (3). Design choice: the allocator’s design achieves low-cost, deterministic, fragmentation-free memory management by using fixed-size word-aligned block sizes (1)(2) and embedding metadata within the memory blocks themselves (3). Benefits: Run-time memory allocation benefits have no real-time drawbacks. |
Importantly, the kernel will always round up the block size to the next multiple of 4. Say the user creates a memory pool, assigning blocks to be 6-byte wide; they will turn into 8-byte blocks.
4.1. How it works
When a routine calls alloc()
, the address to be returned is the one a "free list" is pointing to, say addr1
. Before returning addr1
to the caller, we update the free list to point to the value stored within addr1
- say addr8
at that moment.
When a routine calls free(addr1)
, we overwrite whatever has been written in addr1 with the value-free list point to (if no more alloc()
were issued, it would still be addr8
), and addr1
becomes the free list head again.
Allocating and deallocating fixed-size blocks using this structure and storing meta-data this way is as deterministic (O(1)) and economical as we can get for dynamic memory allocation.
A drawback is if a routine writes to non-allocated memory within a pool it will spoil the meta-data and the Allocator will fail.
5. Inter-Task Communication
Inter-Task Communication (ITC) refers to the mechanisms that enable tasks to coordinate/cooperate/synchronise by means of sending or receiving information that falls into two logical categories: Signals or Messages.
-
Signals: A signal is solely defined by its absence or presence. The meaning is implicit.
-
Messages: When the operations used for tasks to communicate also allow conveying a payload, these mechanisms are regarded as Message Passing.
5.1. Direct Signals (Task Notification)
Within Task Control Block |
---|
Event Register |
Required Flags |
Options |
Each Task Control Block stores a 32-bit Event Register. We define that a 32-bit Signal carries 32 event flags — it can represent a combination of 32 different events, if defining 1 event/bit. A bit set means an event notification is pending to be detected.
Although called Signals this service does not mimic POSIX or UNIX/BSD Signals that act as asynchronous software interrupts.
Bitwise friendly, the API is written as set()
(as to signal/post), get()
(as to wait/pend).
A task checks for a combination of events it is expecting. This combination can be satisfied if ANY
(OR logic) of the required bits are set or if ALL
of the required bits are set (AND logic).
Thus, if the condition is not met the task can optionally suspends, switching to the logical state PENDING
.
When another task issues a set()
which result satisfies the waiting condition, the task state is then READY
.
Upon returning, all required positions have been cleared on the Task’s Event Register.
A set is always an OR operation of an input mask over the current value. 0x00
is invalid for both set()
and get()
operations.
Additional operations are to query
a tasks’s event register, and to clear
its own registers.
5.1.1. Usage Example: Supervisor Task
One possible usage pattern is a task’s cycle begins checking for any events (it is able/supposed to handle). If using it on a supervisor task — it can create a neat event-driven pattern for a soft/firm real-time system:
typedef struct
{
ULONG pendingBit;
TaskHandle_t dstTask;
ULONG dstSignal;
} Route_t;
static const Route_t routes[] =
{
{
PENDING_AIRFLOW_INCREASE,
airFlowTaskHandle,
AIRFLOW_INCREASE_SIGNAL
},
{
PENDING_TEMP_DECREASE,
tempTaskHandle,
TEMP_DECREASE_SIGNAL
},
/* more routes */
}
VOID SupervisorTask(VOID *args)
{
RK_UNUSEARGS;
while(1)
{
ULONG gotFlags = 0UL;
RK_ERR err = kSignalWait(0xFFFF,
RK_FLAGS_ANY,
&gotFlags,
SUPERVISOR_T_PERIOD);
if (err == RK_SUCCESS && gotFlags != 0)
{
for (ULONG i = 0; i < ARRAY_LEN(routes); ++i)
{
if (gotFlags & routes[i].pendingBit)
{
kSignalSet(routes[i].dstTask, routes[i].dstSignal);
}
}
}
/* if there is anything to do if time out */
}
}
Task Signals are the the only ITC primitive that cannot be disabled, thus, they are regarded as a Core Mechanism.
5.2. Semaphores
Semaphore Control Block |
---|
Counter (Unsigned Integer) |
Semaphore Type (Counter/Binary) |
Waiting Queue |
A semaphore S is a nonnegative integer variable, apart from the operations it is subjected to. S is initialized to a nonnegative value. The two operations, called P and V, are defined as follows:
P(S): if S > 0 then S := S-1, else the process is suspended until S > 0.
V(S): if there are processes waiting, then one of them is resumed; else S := S+1.
(Dijkstra, 1968)
Semaphores are public kernel objects for signalling and waiting for events.
The primitives post()
is the V(S)
, and pend()
is the P(S)
as described above.
When pend()
is issued on a semaphore which counter is 0, the caller (optionally) switches to a BLOCKED state, and is enqueued within the semaphore queue.
After that, every post()
issued to a semaphore releases a single task, ordered by priority, until there is no more blocked tasks within the semaphore.
Then, the internal counter will increase above 0, only and if only there are no tasks on the waiting queue when the semaphore is signalled.
5.2.1. Counting and Binary Semaphores
The typical use case for Counting Semaphores is as a "credit tracker" — one uses it to verify (wait/pend
) and indicate (signal/post
) the availability of a countable resource — say, number of slots within a queue.
A Binary Semaphore is a specialisation: it counts up to 1 and down to 0 — they do not accumulate. We often say its either FULL or EMPTY. The typical use case is for task-to-task (unilateral or bi-lateral), or ISR-to-task (unilateral) synchronisation.
In this sense, they overlap the Direct Signals mechanism, that can be seen as a pack of private binary semaphores (only the task itself can pend
but any task can post
). Binary Semaphores can also be used for mutual-exclusion, but it has drawbacks as will be explained later.
It is neither common nor always desirable, but in RK0 Semaphores can broadcast a signal, using wake(&sema, n)
, in which at most n
tasks will be readied. If n=0
all tasks are readied, and this operation can be aliased as flush(&sema)
.
Finally, a query()
operation on a semaphore will return the number of waiting tasks if any, or the counter value. To differentiate, the number of waiting tasks is returned as a negative value. A nonnegative value is the semaphore’s counter value.
Notes:
-
The
post
andpend
operations are aliased tosignal()
andwait()
respectively, to satisfy those who prefer this nomenclature. -
If Binary Semaphore is initialised with a value > 1, the effective value is 1.
5.2.1.1. Usage Example: Task-to-Task Bilateral Synchronisation
The snippet below shows two tasks lock-stepping by posting and pending on (binary) semaphores. Task2 depends on Task1 finishing 'work1' to perform 'work2'. And vice-versa.
(Note Direct Signals are a better choice for this use-case.)
RK_SEMA work1Sema;
RK_SEMA work2Sema;
VOID kApplicationInit(VOID)
{
/* semaphores init at 0 */
kSemaInit(&work1Sema, RK_SEMA_BIN, 0);
kSemaInit(&work2Sema, RK_SEMA_BIN, 0);
}
VOID Task1(VOID* args)
{
RK_UNUSEARGS
while (1)
{
doWork1();
kSemaPost(&work1Sema);
kSemaPend(&work2Sema, RK_WAIT_FOREVER);
/* T1 finished. Waiting for T2. */
}
}
VOID Task2(VOID* args)
{
RK_UNUSEARGS
while (1)
{
kSemaPend(&work1Sema, RK_WAIT_FOREVER);
doWork2();
kSemaPost(&work2Sema);
}
}
5.2.2. Mutex Semaphores (Locks)
Mutex Control Block |
---|
Locked State (Boolean) |
Owner |
Waiting Queue |
Mutex Node (list node within the owner TCB) |
Some code regions are critical in that they cannot be accessed by more than one task at once. Acquiring (lock()
) a mutex before entering a region and releasing it when leaving makes that region mutually exclusive.
A Mutex is another semaphore specialisation — it can be seen as a binary semaphore with a notion of ownership - when a task susccesfully acquires a mutex is now the owner, and only this task can release it.
If a task tries to acquire an already locked mutex, it switches to BLOCKED
state until the mutex is unlocked by its owner. Then, the highest priority task waiting to acquire the resource is dequeued, as on semaphores.
However, unlike semaphores, the complementary operation, unlock()
, when issued by a non-owner, has undefined behaviour. In K0, it will be a hard fault.
Mutexes are solely for mutual exclusion; they cannot be used for signalling. It is common to use Counting Semaphores initialised as 1, or Binary Semaphores for mutual exclusion.
However, particularly for a Counting Semaphore, if the count increases twice in a row, the mutual exclusion is gone. For both, Priority Inversion can become a problem, as will be explained.
PS: Mutexes in RK0 are not recursive. One cannot make reentrant calls on critical regions.
5.2.2.1. Usage Example: Bounded Buffer
The snippet below shows a consumer-producer pattern for a buffer with K slots (bounded buffer pattern). Two semaphores track the number of slots for the producer and items for the consumer. The mutex prevents any write or read from being disrupted.
RK_SEMA item;
RK_SEMA space;
RK_MUTEX lock;
#define N (K)
typedef struct mesg
{
UINT field1;
UINT field2;
UINT field3;
UINT field4;
} Mesg_t; /* a 16-byte message */
/* a ring buffer of messages */
Mesg_t mailbox[N]={0};
kApplicationInit(VOID)
{
kSemaInit(&item, RK_SEMA_COUNT, 0); /* no items */
kSemaInit(&space, RK_SEMA_COUNT, N); /* N buffers available */
kMutexInit(&lock);
}
/* circular buffer handling omitted */
/* wait for space, lock, write, unlock, signal there is item */
VOID PostMail(Mesg_t* sendPtr)
{
kSemaWait(&space, RK_WAIT_FOREVER);
kMutexLock(&lock, RK_WAIT_FOREVER);
memcpy(&mailbox[tail], sendPtr, sizeof(Mesg_t));
kMutexUnlock(&lock);
kSemaSignal(&item);
}
/* wait for item, lock, read, unlock, signal there is space */
VOID PendMail(Mesg_t* recvPtr)
{
kSemaWait(&item, RK_WAIT_FOREVER);
kMutexLock(&lock, RK_WAIT_FOREVER);
memcpy(recvPtr, &mailbox[head], sizeof(Mesg_t));
kMutexUnlock(&lock);
kSemaSignal(&space);
}
5.2.2.2. Priority Inversion and the Priority Inheritance Protocol
Let TH, TM, and TL be three tasks with priority high (H), medium (M) and low (L), respectively. Say TH is dispatched and blocks on a mutex that 'TL' has acquired (i.e.: "TL is blocking TH").
If 'TM' does not need the resource, it will run and preempt 'TL'. And, by transition, 'TH'.
From now on, 'TH' has an unbounded waiting time because any task with priority higher than 'L' that does not need the resource indirectly prevents it from being unblocked — awful.
The Priority Inheritance (PI) Protocol avoids this unbounded waiting. It is characterised by an invariant, simply put:
At any instant a Task assumes the highest priority amongst the tasks it is blocking.
If employed on the situation described above, task TM cannot preempt TL, whose effective priority would have been raised to 'H'.
It is straightforward to reason about this when you consider the scenario of a single mutex.
But when locks nest — that is, more than one critical region — the protocol also needs to be:
-
Transitive: that is, if T1 is blocking T2, and T2 is blocking T3, if T3 has the highest priority, T3 propagates its priority to T1 via T2.
-
A task can own several mutexes at once. Thus, when exiting the critical region it needs to look up each waiting queue, and assume the highest priority. If there are no blocked tasks behind, its nominal priority is then restored. (As tasks are enqueued by priority, it means looking at the task waiting on the head of each waiting queue.)
Below, a demonstration:
/* Task1 has the Highest nominal priority */
/* Task2 has the Medium nominal priority */
/* Task3 has Lowest nominal priority */
/* Note Task3 starts as 1 and 2 are delayed */
RK_DECLARE_TASK(task1Handle, Task1, stack1, STACKSIZE)
RK_DECLARE_TASK(task2Handle, Task2, stack2, STACKSIZE)
RK_DECLARE_TASK(task3Handle, Task3, stack3, STACKSIZE)
RK_MUTEX mutexA;
RK_MUTEX mutexB;
VOID kApplicationInit(VOID)
{
kassert(!kCreateTask(&task1Handle, Task1, RK_NO_ARGS, "Task1", stack1, \
STACKSIZE, 1, RK_PREEMPT));
kassert(!kCreateTask(&task2Handle, Task2, RK_NO_ARGS, "Task2", stack2, \
STACKSIZE, 2, RK_PREEMPT));
kassert(!kCreateTask(&task3Handle, Task3, RK_NO_ARGS, "Task3", stack3, \
STACKSIZE, 3, RK_PREEMPT));
/* mutexes initialised with priority inheritance enabled */
kMutexInit(&mutexA, RK_INHERIT);
kMutexInit(&mutexB, RK_INHERIT);
}
VOID Task3(VOID *args)
{
RK_UNUSEARGS
while (1)
{
printf("@ %lums: [TL] Attempting to LOCK 'A' | Eff: %d | Nom: %d\r\n", kTickGet(),
runPtr->priority, runPtr->prioReal);
kMutexLock(&mutexA, RK_WAIT_FOREVER);
printf("@ %lums: [TL] LOCKED 'A' (in CS) | Eff: %d | Nom: %d\r\n", kTickGet(),
runPtr->priority, runPtr->prioReal);
kBusyWait(60); /* <-- important */
printf("@%lums: [TL] About to UNLOCK 'A' | Eff: %d | Nom: %d\r\n", kTickGet(),
runPtr->priority, runPtr->prioReal);
kMutexUnlock(&mutexA);
printf("--->");
printf("@%lums: [TL] Exit CS | Eff: %d | Nom: %d\r\n", kTickGet(),
runPtr->priority, runPtr->prioReal);
kSleep(4);
}
}
VOID Task2(VOID *args)
{
RK_UNUSEARGS
while (1)
{
kSleep(5);
printf("@%lums: [TM] Attempting to LOCK 'B' | Eff: %d | Nom: %d\r\n", kTickGet(),
runPtr->priority, runPtr->prioReal);
kMutexLock(&mutexB, RK_WAIT_FOREVER);
printf("@%lums: [TM] LOCKED 'B', now trying to LOCK 'A' | Eff: %d | Nom: %d\r\n",
kTickGet(), runPtr->priority, runPtr->prioReal);
kMutexLock(&mutexA, RK_WAIT_FOREVER);
printf("@%lums: [TM] LOCKED 'A' (in CS) | Eff: %d | Nom: %d\r\n", kTickGet(),
runPtr->priority, runPtr->prioReal);
kMutexUnlock(&mutexA);
printf("@%lums: [TM] UNLOCKING 'B' | Eff: %d | Nom: %d\r\n", kTickGet(),
runPtr->priority, runPtr->prioReal);
kMutexUnlock(&mutexB);
printf("--->");
printf("@%lums: [TM] Exit CS | Eff: %d | Nom: %d\r\n", kTickGet(),
runPtr->priority, runPtr->prioReal);
}
}
VOID Task1(VOID *args)
{
RK_UNUSEARGS
while (1)
{
kSleep(2);
printf("@%lums: [TH] Attempting to LOCK 'B'| Eff: %d | Nom: %d\r\n", kTickGet(),
runPtr->priority, runPtr->prioReal);
kMutexLock(&mutexB, RK_WAIT_FOREVER);
printf("@%lums: [TH] LOCKED 'B' (in CS) | Eff: %d | Nom: %d\r\n", kTickGet(),
runPtr->priority, runPtr->prioReal);
kMutexUnlock(&mutexB);
printf("--->");
printf("@%lums: [TH] Exit CS | Eff: %d | Nom: %d\r\n", kTickGet(),
runPtr->priority, runPtr->prioReal);
}
}
Result and comments:
>>>> TL locks 'A'. Higher priority tasks are sleeping. <<<< @ 14720ms: [TL] Attempting to LOCK 'A' | Eff: 3 | Nom: 3 @ 14720ms: [TL] LOCKED 'A' (in CS) | Eff: 3 | Nom: 3 @14721ms: [TM] Attempting to LOCK 'B' | Eff: 2 | Nom: 2 >>>> TM acquires 'B' and is blocked by TL on 'A'. TL inherits TM's priority. <<<< @14721ms: [TM] LOCKED 'B', now trying to LOCK 'A' | Eff: 2 | Nom: 2 >>>> TH will blocked by TM on 'B': <<<< @14722ms: [TH] Attempting to LOCK 'B'| Eff: 1 | Nom: 1 >>>> TM inherits TH's priority. TL inherits TH's priority via TM. <<<< @14780ms: [TL] About to UNLOCK 'A' | Eff: 1 | Nom: 3 >>>> Upon unlocking 'A', TL is preempted by TM. It means TL's priority has been restored, as it is no longer blocking a higher priority task. <<<< >>>> Now TM acquires 'A' <<<< @14780ms: [TM] LOCKED 'A' (in CS) | Eff: 1 | Nom: 2 >>>> After releasing 'A', but before releasing 'B', TM's priority is still '1', as it is blocking TH while holding 'B'. <<<< @14780ms: [TM] UNLOCKING 'B' | Eff: 1 | Nom: 2 >>>> Upon unlocking 'B' TM is preempted by TH. (TM's priority has been restored.) <<<< @14780ms: [TH] LOCKED 'B' (in CS) | Eff: 1 | Nom: 1 >>> RESULT: even though priority inversion was enforced, tasks leave the nested lock ordered by their nominal priority. <<< --->@14780ms: [TH] Exit CS | Eff: 1 | Nom: 1 --->@14780ms: [TM] Exit CS | Eff: 2 | Nom: 2 --->@14780ms: [TL] Exit CS | Eff: 3 | Nom: 3
Importantly, the worst-case time is bounded by the time the lowest priority task holds a lock (60 ms in the example: 14720ms → 14780ms).
As for each priority update we check each waiting queue for each mutex a task owns, the time-complexity is is linear owner*mutex. But, typically no task ever holds more than a few mutexes. Yet, one should not be encouraged to nest locks if not needed.
Mutexes vs Binary Semaphores
There is (or used to be) a lot of fuss about whether binary semaphores are appropriate to use as locks. As a practical guideline, if all tasks sharing the resource have the same priority, using a binary semaphore can be appropriate — because a binary semaphore is considerably faster. It all depends on the case.
The drawback is the lack of ownership: any task can accidentally release the resource. On a large codebase, this can become a real problem. Nonetheless, this is a problem for semaphores in general.
For tasks with different priorities, binary semaphores should never be considered for mutual exclusion unless priority inversion is not a problem (how?).
Counting semaphores initialised as 1 is too risky. Besides the priority inversion, if the count ever increases above 1, mutual exclusion is lost, and multiple tasks can enter the critical section at once.
5.3. Scheduler Lock
Often, we need a task to perform operations without being preempted. A mutex serialises access to a code region but does not prevent a task from being preempted while operating on data. Depending on the case, this can lead to inconsistent data state.
An aggressive way is to disable interrupts globally. For kernel services often it is the only way to keep data integrity. On the higher level it is feasible for very short operations and/or when you need to protect data from interrupts altogether.
A less aggressive approach is to make the task non-preemptible with kSchLock()
before entering the critical region and kSchUnlock()
when leaving. This way, interrupts are still being sensed, and even higher-priority tasks might switch to a ready state, but the running thread will not be preempted.
The priority inversion it potentially causes is bounded. If a higher-priority task is readied while the scheduler is locked, the context switch happens immediately after unlocking.
Note that for locking/unlocking the scheduler the global interrupts will be disabled for the time to increment/decrement a counter, therefore, if your atomic operation is as short as that (3 to 4 cycles), disabling/enabling global interrupts is a better alternative.
To add to the discussion, when two threads need to access the same data to 'read-modify-write', a lock-free mechanism is the LDREX/STREX operations of ARMv7M (or more generally C11 atomics). They do not avoid preemptions, and particularly in ARMv7m, if the data is touched by an ISR before the store-exclusive concludes, the ownership is lost. Typically used for multi-core spin-locking.
5.4. Sleep Queues
Event Control Block |
---|
Waiting Queue |
The RK_SLEEP_QUEUE
object is simply a queue of tasks sleeping waiting for a signal/wake operation on them. That could be read 'as tasks sleeping, until they are signalled an event has happened'.
That’s why this primitive was formerly called RK_EVENT
. Naturally we might name the queue as to indicate the event, which normally is a state (e.g., notFull
, notEmpty
) or the action it triggers (e.g., goWriters
, goReaders
).
An RK_SLEEP_QUEUE object does not have any records to indicate if an associated event has ever happened.
Thus, a call wait(&sleepq, timeout)
always put the caller task to sleep. Note that using RK_NO_WAIT
on this primitive is meaningless, because there is nothing to 'try'. The call will just return.
A signal(&sleepq)
will wake-up a single task - the highest priority. A wake(&sleepq, n, &r)
is a broadcast: at most n
sleeping tasks will switch to READY
. r
will store the number of remaining tasks, if any.
If willing to wake all tasks, one either make n=0
, or use the flush(&sleepq)
helper.
A query(&sleepq)
operation returns the number of sleeping tasks.
Finally, as any synchronisation needs an associated waiting queue, RK_SLEEP_QUEUE
is a building block for high-level synchronisation schemes.
To provide more flexibility, there is an option to bypass queue discipline and ready an arbitrary task identified by its Task Handle, using ready(&sleepq, taskHandle)
.
The stateless characteristic of Sleep Queues make it a very limited mechanism to be used alone, as they are prone to lost wake-up signals. There are suitable cases, all simple unilateral cases. Yet, its main purpose is to be a building block for Monitors as we will be seeing. |
5.5. Monitors
Monitors (Hansen) were originally a programming language feature (in Concurrent Pascal), encapsulating conditional synchronisation and mutual exclusion. The mutual exclusion was enforced by language and there was a single waiting queue for a monitor.
A Monitor invariant is that a single task can be active — a task can go inactive either by suspension or when leaving the monitor procedure. When a task is signalled it is supposed to wake up, and now this standoff has to be solved.
5.5.1. Monitor Semantics
To keep the invariant, there are at least three approaches. (I am not aware of any other).
-
Signal-and-leave (Hansen)
The the signaller task will signal another task and immediately leave. It was the first proposal, as there was a single waiting queue associated to a monitor.
-
Signal-and-wait (Hoare)
In this case, a task signals and blocks itself on a 'urgent queue'. When the waiter exits the monitor or blocks again, the signaller resumes.
-
Signal-and-continue (Mesa)
A signal is issued while holding a mutex the signalled task must also acquire. Thus a signaller is not enforced to either leave or suspend itself, but it does need to release the mutex before going inactive.
(Mesa is not a person, it was a Xerox Programming Language)*
So a task signal and can keep going active in the Monitor. So between the moment a condition triggers a signal, and the signalled task reacts, we can have something between 1ns and 400 years. We need to check the condition again upon waking in the monitor: the famous test-loop while(!condition) { wait(); }
is a characteristic of Mesa Monitors.
5.5.2. Monitor-like patterns in RK0
To appreciate the discussion on Condition Variables that follows, let’s build a Monitor-like solution for the producer-consumer problem.
5.5.2.1. Producer-Consumer Solution using a Mesa Monitor
/* PSEUDO CODE */
/* MONITOR ADT: */
/** DATA **/
UINT currentItemNum : number of items in the buffer
UINT const maxItemNum : buffer capacity (slots)
RK_MUTEX mutex : monitor lock
RK_SLEEP_QUEUE waitingItem : tasks waiting for an item
RK_SLEEP_QUEUE waitingSlot : tasks waiting for a slot
/** PROCEDURES **/
VOID INSERT(ITEM_t) : insert an item
VOID EXTRACT(ITEM_t*) : extract an item
*/
/**********************************************
Producer-Consumer Solution with a Mesa Monitor
***********************************************/
/* MONITOR PROCEDURES */
/* INSERT AN ITEM */
MONITOR.INSERT(ITEM_t item)
{
LOCK(&mutex); /* lock monitor */
/* condition for producers is having
free slots */
while (!(currentItemNum < maxItemNum))
{
/* is full */
DISABLE_PREEMPTION(); /* disable scheduler */
UNLOCK(&mutex);
WAIT(&waitingSlot);
ENABLE_PREEMPTION();
/***********************************************
!IMPORTANT!: if preemption was allowed and unlock(&mutex) switched a higher priority task to READY, the active task would be preempted,
compromising the entire synch logic.
*************************************************/
/* when waking acquire mutex again */
LOCK(&mutex);
}
/* buffer has slots, mutex is held */
depositItem(item);
/* increase number of items */
currentItemNum++;
/* signal any tasks waiting for an item */
SIGNAL(&waitingItem);
/* release monitor and leave */
UNLOCK(&mutex);
return;
}
/* PROCEDURE: EXTRACT AN ITEM */
MONITOR.EXTRACT(ITEM_t* item):
{
LOCK(&mutex); /* lock monitor */
while (!(currentItemNum > 0))
{
/* is empty */
DISABLE_PREEMPTION();
UNLOCK(&mutex);
WAIT(&waitingItem);
ENABLE_PREEMPTION();
/* when waking acquire mutex again */
LOCK(&mutex);
}
*item = extractItem();
/* decrease number of items */
currentItemNum--;
/* signal any tasks waiting for a free slot */
SIGNAL(&waitingSlot);
/* release monitor and leave */
UNLOCK(&mutex);
return;
}
There is plenty happenning in the above pattern: the correct combination of locking, unlocking, sleeping and signalling under a predicate, is what enforce the correct precedence of tasks accessing a shared resource. .
Note that mutex LOCKs are ordered to guarantee a single task owns the currentItemNum
variable. Importantly, when signalling a producer that the number of items decreased (or number of slots increased) even if the producer has a higher priority and is dispatched it will block trying to LOCK(&mutex)
when resuming within while(!notFull)
.
The UNLOCK-WAIT
sequence within the testing loop has preemption disabled because after releasing the lock, the task cannot be allowed to resume within the monitor again, for any reason that is
not the monitor predicate being satisfied.
Lost signal?
What if the active task in the Monitor changes the currentItemNum
and is preempted before being able to signal a waiting task (a consumer if incrementing, or a producer when decrementing)? Since it owns the mutex lock, it will resume to find the Monitor at the same state when preemption happened, consistent with the actual state of the bounded buffer (assuming the bounded buffer is only being accessed through the Monitor).
5.5.3. Condition Variables in RK0
The producer-consumer problem using a Mesa Monitor shown above consists of two procedures (plus initialisation), that encapsulates many synchronisation details.
In RK0, any association of a RK_MUTEX
and one or more RK_SLEEPING_QUEUE
can be treated as condition variables, using the helpers
-
kCondVarWait(&sleepq, &mutex, timeout)
-
kCondVarSignal(&sleepq)
-
kCondVarBroadcast(&sleepq)
that follows the same semantics of Pthreads Condition Variables, which in turn are aligned with Mesa Monitors.
When using the helpers, a testing-loop is written as:
while(!condition)
{
/*unlock-sleep, atomic:*/
kCondVarWait(&sleepq, &mutex, timeout);
/* when waking it issues lock(&mutex) */
}
/* if here, condition is true and mutex is locked */
The examples below clarify how Monitor-like schemes can be constructed in RK0.
5.5.3.1. Usage Example: Synchronisation Barrier
A given number of tasks must reach a point in the program before all can proceed, so every task calls a barrWait(&barrier)
to catch up with the set of tasks it must synchronise.
The last task entering the barrier will broadcast a signal to all tasks waiting for the wake condition.
At any moment within a Monitor a single task is RUNNING
(what also happpen to be a kernel invariant), all other tasks within the monitor are either SLEEPING
(for some condition) or BLOCKED
(on a mutex).
/* Synchronisation Barrier */
typedef struct
{
RK_MUTEX lock;
RK_SLEEP_QUEUE allSynch;
UINT count; /* number of tasks in the barrier */
UINT round; /* increased every time all tasks synch */
UINT nRequired; /* number of tasks required */
} Barrier_t;
VOID BarrierInit(Barrier_t *const barPtr, UINT nRequired)
{
kMutexInit(&barPtr->lock, RK_INHERIT);
kEventInit(&barPtr->allSynch);
barPtr->count = 0;
barPtr->round = 0;
barPtr->Required = nRequired;
}
VOID BarrierWait(Barrier_t *const barPtr)
{
UINT myRound = 0;
kMutexLock(&barPtr->lock, RK_WAIT_FOREVER);
/* save round number */
myRound = barPtr->round;
/* increase count on this round */
barPtr->count++;
if (barPtr->count == barPtr->nRequired)
{
/* reset counter, inc round, broadcast to sleeping tasks */
barPtr->round++;
barPtr->count = 0;
kCondVarBroadcast(&barPtr->allSynch);
}
else
{
/* a proper wake signal might happen after inc round */
while ((UINT)(barPtr->round - myRound) == 0U)
{
RK_ERR err = kCondVarWait(&barPtr->allSynch, &barPtr->lock, RK_WAIT_FOREVER);
kassert(err==RK_SUCCESS);
}
}
kMutexUnlock(&barPtr->lock);
}
#define N_REQUIRED 3
Barrier_t syncBarrier;
VOID kApplicationInit(VOID)
{
kassert(!kCreateTask(&task1Handle, Task1, RK_NO_ARGS, "Task1", stack1, STACKSIZE, 2, RK_PREEMPT));
kassert(!kCreateTask(&task2Handle, Task2, RK_NO_ARGS, "Task2", stack2, STACKSIZE, 3, RK_PREEMPT));
kassert(!kCreateTask(&task3Handle, Task3, RK_NO_ARGS, "Task3", stack3, STACKSIZE, 1, RK_PREEMPT));
BarrierInit(&syncBarrier, N_REQUIRED);
}
VOID Task1(VOID* args)
{
RK_UNUSEARGS
while (1)
{
kPuts("Task 1 is waiting at the barrier...\r\n");
BarrierWait(&syncBarrier);
kPuts("Task 1 passed the barrier!\r\n");
kSleep(8);
}
}
VOID Task2(VOID* args)
{
RK_UNUSEARGS
while (1)
{
kPuts("Task 2 is waiting at the barrier...\r\n");
BarrierWait(&syncBarrier);
kPuts("Task 2 passed the barrier!\r\n");
kSleep(5);
}
}
VOID Task3(VOID* args)
{
RK_UNUSEARGS
while (1)
{
kPuts("Task 3 is waiting at the barrier...\r\n");
BarrierWait(&syncBarrier);
kPuts("Task 3 passed the barrier!\r\n");
kSleep(3);
}
}
5.5.3.2. Usage Example: Readers Writers Lock
Several readers and writers share a piece of memory. Readers can concurrently access the memory to read; a single writer is allowed (otherwise, data would be corrupted).
When a writer finishes, it checks for any readers waiting. If there is, the writer flushes the readers waiting queue. If not, it wakes a single writer, if any. When the last reader finishes, it signals a writer.
Every read or write operation begins with an acquire and finishes with a release.
/* RW-Lock */
/* a single writer is allowed if there are no readers */
/* several readers are allowed if there is no writer*/
typedef struct
{
RK_MUTEX lock;
RK_SLEEP_QUEUE writersGo;
RK_SLEEP_QUEUE readersGo;
INT rwCount; /* number of active readers if > 0 */
/* active writer if -1 */
}RwLock_t;
VOID RwLockInit(RwLock_t *const rwLockPtr)
{
kMutexInit(&rwLockPtr->lock, RK_INHERIT);
kEventInit(&rwLockPtr->writersGo);
kEventInit(&rwLockPtr->readersGo);
rwLockPtr->rwCount = 0;
}
/* A writer can acquire if rwCount = 0 */
/* An active writer is indicated by rwCount = -1; */
VOID RwLockAcquireWrite(RwLock_t *const rwLockPtr)
{
kMutexLock(&rwLockPtr->lock, RK_WAIT_FOREVER);
/* if different than 0, there are either writers or readers */
/* sleep to be signalled */
while (rwLockPtr->rwCount != 0)
{
kCondVarWait(&rwLockPtr->writersGo, &rwLockPtr->lock, RK_WAIT_FOREVER);
/* mutex is locked when waking up*/
}
/* woke here, set an active writer */
rwLockPtr->rwCount = -1;
kMutexUnlock(&rwLockPtr->lock);
}
/* a writer releases, waking up all waiting readers, if any */
/* if there are no readers, a writer can get in */
VOID RwLockReleaseWrite(RwLock_t *const rwLockPtr)
{
kMutexLock(&rwLockPtr->lock, RK_WAIT_FOREVER);
rwLockPtr->rwCount = 0; /* indicate no writers*/
/* if there are waiting readers, flush */
ULONG nWaitingReaders=0;
kEventQuery(&rwLockPtr->readersGo, &nWaitingReaders);
if (nWaitingReaders > 0)
{
/* condVarBroadcast is just an alias for an event flush */
kEventFlush(&rwLockPtr->readersGo);
}
else
{
/* wake up a single writer if any */
kEventSignal(&rwLockPtr->writersGo);
}
kMutexUnlock(&rwLockPtr->lock);
}
/* a reader can acquire if there are no writers */
VOID RwLockAcquireRead(RwLock_t *const rwLockPtr)
{
kMutexLock(&rwLockPtr->lock, RK_WAIT_FOREVER);
/* if there is an active writer, sleep */
while (rwLockPtr->rwCount < 0)
{
kCondVarWait(&rwLockPtr->readersGo, &rwLockPtr->lock, RK_WAIT_FOREVER);
/* mutex is locked when waking up*/
}
/* increase rwCount, so its > 0, indicating readers */
rwLockPtr->rwCount ++;
kMutexUnlock(&rwLockPtr->lock);
}
/* a reader releases and wakes a single writer */
/* if it is the last reader */
VOID RwLockReleaseRead(RwLock_t *const rwLockPtr)
{
kMutexLock(&rwLockPtr->lock, RK_WAIT_FOREVER);
rwLockPtr->rwCount --;
if (rwLockPtr->rwCount == 0)
{
kEventSignal(&rwLockPtr->writersGo);
}
kMutexUnlock(&rwLockPtr->lock);
}
In the image below, 4 tasks — a fast writer (Task 1), a slow writer (Task 4) and two readers (Task3 is faster than Task2) — reading from and writing to a shared UINT variable:

MESSAGE PASSING
In real-time applications, Message Passing often encounters the following scenarios:
-
Some messages are consumed by tasks that can’t do anything before processing information — thus, these messages end up also being signals. For Example, a server needs (so it blocks) a command to process and/or a client that blocks for an answer.
-
A particular case of the above scenario is fully synchronous: client and server run on lockstep.
-
Two tasks with different rates need to communicate, and cannot lockstep. A faster producer might use a buffer to accommodate a relatively small burst of generated data, or a quicker consumer will drop repeated received data.
-
Other times, we need to correlate data with time for processing, so using a queue gives us the idea of data motion. Eg., when calculating the mean value of a transductor on a given period.
-
Past data is useless for real-time tasks such as servo-control loops. Consumers need the most recent data for processing. For example, a drive-by-wire system or a robot deviating from obstacles. In these cases, message passing must be lock-free while guaranteeing data integrity.
|
5.6. Mailbox
Mailbox Control Block |
---|
Mail Address |
Waiting queue |
Owner Task* |
In GPOS jargon, mailboxes are queues of messages—a distinction from pipes (which are byte streams)—but in embedded system software, mailboxes are often said to have a capacity of a single item. More recently, you will not find it as a distinct mechanism—you use a 1-item queue.
A Mailbox allows a task to exclusively write (post) and read (pend) a memory region and to be notified when another task writes or reads to it. Therefore, its typical operation provides mutual exclusion and notification: very handy.
A message within a mailbox is the address of an object. The sender and receiver agree on the concrete mail implementation as part of the mail interface contract; also, the data has to remain unchanged until the receiver 'consumes' it. That is another part of the contract. .
The semantics are simple: a Mailbox will be EMPTY
when its storage points to NULL
; otherwise, it is FULL
. It will be EMPTY
(FULL) after a successful pend()
(post()
) operation.
When a producer post()
to a FULL
mailbox, it (optionally) blocks and is placed in the Mailbox waiting queue. The associated task will switch to the state SENDING
.
Likewise, a consumer (optionally) blocks when issuing a pend()
on an empty Mailbox. The task status switches to RECEIVING,
and
is enqueued in the mailbox waiting queue.
Other methods are peek()
to read a message without removing it (Mailbox will continue as FULL
) and postovw()
to overwrite whatever message is in the Mailbox. These two methods are often paired when a Mailbox is supposed to be a message container several tasks will read from, and the message stays until overwritten. Note in these cases, copying to/from the mailbox is desirable, as it is hard to keep data life cycle when many tasks can read/write.
* we will discuss ownership on message passing later.
Passing Messages by reference is a typical “embedded thing” – because it is cheap, deterministic and DMA-friendly. |
5.6.1. Example: Multi-client-server synchronous command-response
The snippet below presents two clients and one server on a lock-step communication. It is a (local) procedure call, although the server simply echoes back what has received. Logically this communication is unbuffered.
/* This example includes <string.h> for convenience */
RK_MBOX serverReqMbox; /* server incoming commands */
RK_MBOX serverAckMbox; /* server incoming reponse acks */
RK_MBOX clientMbox1; /* response for client 1 */
RK_MBOX clientMbox2; /* response for client 2 */
/* Command Requests are assembled on an Application Data Unit */
typedef struct
{
BYTE length; /* Length of the APDU payload */
BYTE payload[32]; /* APDU payload */
RK_MBOX *replyMbox; /* Pointer to the client's reply mailbox */
} APDU RK_ALIGN(4);
void kApplicationInit(VOID)
{
kMboxInit(&serverReqMbox, NULL);
kMboxInit(&serverAckMbox, NULL);
kMboxInit(&clientMbox1, NULL);
kMboxInit(&clientMbox2, NULL);
}
/* Highest Priority */
/* the server response is to ECHO the request back to the client; then, it pends on a mailbox waiting for the client to acknowledge the response. So it proceeds to process further requests. */
VOID ServerTask(VOID* args)
{
RK_UNUSEARGS
APDU *request, response;
UINT* ackResp;
while (1)
{
/* Wait for a request */
if (kMboxPend(&serverReqMbox, (VOID **)&request, RK_WAIT_FOREVER) == RK_SUCCESS)
{
kprintf("[SERVER] RECV: %s\r\n", request->payload);
\/* Process the request */
response.length = (BYTE) snprintf((char*) response.payload,
sizeof(response.payload), "ECHO %s",
request->payload);
/* Echo to client's reply mailbox */
if (kMboxPost(request->replyMbox, &response, RK_WAIT_FOREVER) != RK_SUCCESS)
{
kprintf("ECHO fail\r\n");
}
if (kMboxPend(&serverAckMbox, (VOID **)&ackResp, RK_WAIT_FOREVER) == RK_SUCCESS)
kprintf("[SERVER] CLIENT %d SERVED.\r\n", *ackResp);
/* now it is safe to process another request */
}
}
}
/* same priority as Client2 */
VOID Client1Task(VOID* args)
{
RK_UNUSEARGS
APDU request, *response;
while (1)
{
/* Prepare the request */
snprintf((char*) request.payload, sizeof(request.payload),
"Hello from Client 1");
request.length = (BYTE) strlen((char*) request.payload);
request.replyMbox = &clientMbox1; /* Specify the reply mailbox */
/* Send the request to the server */
if (kMboxPost(&serverReqMbox, &request, RK_WAIT_FOREVER) == RK_SUCCESS)
{
/* Wait for the response */
if (kMboxPend(&clientMbox1, (VOID **)&response, RK_WAIT_FOREVER)
== RK_SUCCESS)
{
kprintf("[CLIENT #1] RECV: %s\r\n", response->payload);
/* we not use a simple direct signal or binary semaphore because the client has to block if the server has a waiting ACK message to waiting to be retrieved */
UINT ack=1;
kMboxPost(&serverAckMbox, &ack, RK_WAIT_FOREVER);
/* now it is safe to send another request */
}
else
{
kprintf("1F\r\n");
}
}
else
{
kprintf("1F\r\n");
}
}
}
VOID Client2Task(VOID* args)
{
RK_UNUSEARGS
APDU request, *response;
while (1)
{
/* Prepare the request */
snprintf((char*) request.payload, sizeof(request.payload),
"Hello from Client 2");
request.length = (BYTE) strlen((char*) request.payload);
request.replyMbox = &clientMbox2; /* Specify the reply mailbox */
/* Send the request to the server */
if (kMboxPost(&serverReqMbox, &request, RK_WAIT_FOREVER) == RK_SUCCESS)
{
/* Wait for the response */
if (kMboxPend(&clientMbox2, (VOID **)&response, RK_WAIT_FOREVER)
== RK_SUCCESS)
{
kprintf("[CLIENT #2] RECV: %s\r\n", response->payload);
UINT ack=2;
kMboxPost(&serverAckMbox, &ack, RK_WAIT_FOREVER);
}
else
{
kprintf("2FAIL\r\n");
}
}
else
{
kprintf("2FAIL\r\n");
}
}
}

Had the server not blocked waiting for an ACK, the former response would have been overwritten before a client could have read it, given how priorities are set. To accommodate two clients while still passing by reference, the server would need to keep the response on different buffers.
If a copy were passed as a response, the server would not need to block for an ACK, provided the response was sent before receiving another request.
5.7. Message Queues
The classic Message Queue on UNIX SVR4 is defined as the 'head of a linked list of messages'. Some RTOSes implement Message Queues using linked lists, in which case a central pool of buffers might exist.
RK0, two mechanisms are present:
-
A Mail Queue (or a Queue) is a 'multi-item' Mailbox: it holds multiple generic pointers as messages.
-
A Stream Queue (or a Stream) is a ring buffer of N fixed-size messages (word-aligned). Streams perform deep copies - from sender storage to the stream buffer, and from the stream buffer to receiver storage.
They are offered as different mechanisms because they have different best-use cases. Still, one probably does not need both, as one can manipulate Mail Queues to pass by copy, and Stream Queues to pass by pointers.
5.7.1. Mail Queue
Queue Control Block |
---|
Pointer to Buffer Address |
Write Position |
Read Position |
Max. number of mails |
Current number of mails |
Waiting queue |
Owner Task |
Mail Queues (or just Queues) are Mailboxes that can hold several messages in a FIFO queue. Indeed, a Mail Queue with a size of 1 will behave as a Mailbox.
The programmer must provide a buffer to hold N message addresses for a Queue. The main primitives are post(), pend(), peek(), and jam().
Peek reads the Queue front message without extracting it, and Jam places a message on the queue front so that this message will be Last-In-First-Out.
Mails will be enqueued in a FIFO order (except when using jam()
).
A single-slot Queue behaves as a Mailbox. Still, Mailboxes are provided as a distinct service from Queues because a Queue Control Block is roughly three times larger than a Mailbox, plus Queue methods are considerably heavier. As mailboxes are extremely handy, providing them as a standalone mechanism allows them to be composed with other features while keeping queues disabled entirely. |
For both Mail Queues and Mailboxes, if your message is a 4-byte message — e.g., a INT value — they can (and probably should) be passed by copy: just cast to (VOID*) when transmitting, and cast back to INT when receiving.
5.7.1.1. Usage Example: Work Queue
Multiple producer tasks (Sensor, PID Controller, and UI) create Job_t objects and submit their addresses to a Mail Queue (jobQueue).
In this example, the worker thread logs system activity. As it runs on the lowest priority, it maintains system responsiveness with minimal intrusion.
The same pattern can support actual processing. You could either embed a function pointer in each job for fully dynamic behaviour or define a command ID and use a central dispatch table in the worker thread to invoke appropriate handlers (loot at Usage Example: Supervisor Task).
The buffer for the Mail Queue must be a |
/* Job_t and queue definitions */
#define MAX_JOBS 8
typedef struct
{
BYTE length;
BYTE payload[64];
} Job_t;
static Job_t jobPoolBuf[MAX_JOBS] __attribute__((section("_user_heap")));
static RK_MEM jobPool;
static VOID *jobQueueBuf[MAX_JOBS];
static RK_QUEUE jobQueue;
/* Plant model state */
static volatile float plantTemp = 25.0f;
static const float ambientTemp = 20.0f;
/* Convert plantTemp to integer for logging */
INT readTemp(VOID)
{
return (INT)plantTemp;
}
/* Simulate button every 2s */
INT buttonPressed(VOID)
{
return ((kTickGet() % 2000) < 20); /* the condition will hold true for 20ms every 2s */
}
VOID kApplicationInit( VOID)
{
kMemInit( &jobPool, jobPoolBuf, sizeof(Job_t), MAX_JOBS);
kQueueInit( &jobQueue, jobQueueBuf, MAX_JOBS);
}
/* PID Controller Task (High priority) */
/* note: this is a sloppy zero-effort tunning
just for printing something */
VOID PIDControllerTask( VOID *args)
{
RK_UNUSEARGS
const float Kp=1.0f, Ki=0.1f, Kd=0.05f;
float prev=plantTemp;
float integral=0.0f;
const float dt=0.5f;
while(1)
{
/* Read plant state */
float measure = plantTemp;
/* PID compute */
float error = 25.0f - measure;
integral += error * dt;
float derivative = (measure - prev) / dt;
float output = Kp*error + Ki*integral - Kd*derivative;
prev = measure;
/* Apply to plant model */
/* the plant cooling model: (temp-amb)*0.1 */
plantTemp += (output - (plantTemp - ambientTemp)*0.1f) * dt;
/* Post log job */
Job_t *jobPtr = kMemAlloc( &jobPool);
if(jobPtr)
{
CHAR buf[32];
/* this format floats for printf not supporting float */
formatFloat (buf, sizeof(buf), output);
snprintf( (CHAR*)jobPtr->payload,
sizeof(jobPtr->payload),
"[CTRL] O=%s T=%d", buf, readTemp());
jobPtr->length = strlen((CHAR*)jobPtr->payload);
if(kQueuePost( &jobQueue, job, RK_NO_WAIT) != RK_SUCCESS)
{
/*as the worker thread is freeing the memory blocks
if the queue is full and we do not want to block
we free the allocated memory; otherwise, it would leak
*/
kMemFree( &jobPool, job);
}
}
kSleepPeriodic(500);
}
}
/* Sensor Task (Mid priority) */
VOID TempSensorTask( VOID *args)
{
RK_UNUSEARGS
while(1)
{
Job_t *jobPtr = kMemAlloc(&jobPool);
if(jobPtr)
{
snprintf( (CHAR*)jobPtr->payload, sizeof(jobPtr->payload),
"[SENSOR] T=%dC", readTemp());
jobPtr->length = strlen((CHAR*)jobPtr->payload);
if(kQueuePost( &jobQueue, jobPtr, RK_NO_WAIT) != RK_SUCCESS)
{
kMemFree( &jobPool, jobPtr);
}
}
kSleepPeriodic(1000);
}
}
/* UI Task (Low priority) */
/* this is to cause a temperature disturbance */
VOID UIButtonTask( VOID *args)
{
RK_UNUSEARGS
while(1)
{
if(buttonPressed())
{
plantTemp -= plantTemp*0.15f /* disturb the temperature */
Job_t *jobPtr = kMemAlloc( &jobPool);
if(jobPtr)
{
snprintf((CHAR*)jobPtr->payload, sizeof(jobPtr->payload),
"[BTN] Temp: %d", (INT)plantTemp);
jobPtr->length = strlen((CHAR*)jobPtr->payload);
if(kQueuePost( &jobQueue, jobPtr, RK_NO_WAIT) != RK_SUCCESS)
{
kMemFree( &jobPool, jobPtr);
}
}
}
kSleepPeriodic( 2000);
}
}
/* Worker Task (Lowest priority) */
VOID WorkerTask( VOID *args)
{
RK_UNUSEARGS
Job_t *jobPtr = NULL;
while(1)
{
if(kQueuePend( &jobQueue, (VOID**)&jobPtr, RK_WAIT_FOREVER)==RK_SUCCESS)
{
printf("[WORKER] %s\r\n", jobPtr->payload);
kMemFree( &jobPool, jobPtr);
}
}
}

5.7.1.2. A Logger Pattern
This is the logPost(…)
utility that is being used in some examples.
#include <stdio.h> /* needed for printf */
#include <stdarg.h> /* needed for va_args */
#include <kstring.h> /* if including <string.h> RK_* custom string operations are remapped to string.h */
/* this version uses printf. one could just store data on the queue to inspect */
#define LOGLEN 64
#define LOGBUFSIZ 16
struct log
{
RK_TICK t;
CHAR s[LOGLEN];
}RK_ALIGN(4);
typedef struct log Log_t;
/* memory partition pool */
Log_t qMemBuf[LOGBUFSIZ] __K_HEAP;
/* buffer for the mail queue */
VOID *qBuf[LOGBUFSIZ];
/* mail queue */
RK_QUEUE logQ;
/* mem allocator */
RK_MEM qMem;
/* (v)printf needs a custom _write() backend syscall, typically using UART */
void kprintf(const char *fmt, ...)
{
va_list args;
va_start(args, fmt);
/* printing to stderr; if there is no particular stderr (fd==2) redirection on _write(), this has the same effect of vprintf(fmt, args), using stdout (fd==1) */
vfprintf(stderr, fmt, args);
va_end(args);
}
VOID logPost(const char *fmt, ...)
{
kSchLock();
Log_t *logPtr = (Log_t*)kMemAlloc(&qMem);
if (logPtr)
{
logPtr->t = kTickGetMs();
va_list args;
va_start(args, fmt);
int len = vsnprintf(logPtr->s, sizeof(logPtr->s), fmt, args);
va_end(args);
/* if len >= size of the message it has been truncated */
if (len >= (int)sizeof(logPtr->s))
{
/* add "..." to replace" where the truncation happend */
if (len > 4)
RK_STRCPY(&logPtr->s[len - (int)4], "...");
}
/* if queue post fails, free memory so it doesnt leak */
if (kQueuePost(&logQ, (VOID*)logPtr, RK_NO_WAIT) != RK_SUCCESS)
kMemFree(&qMem, logPtr);
}
kSchUnlock();
}
/* you can make it as a module and publish the logPost API */
/* Usage Example: */
VOID someTask(VOID* args)
{
...
while(1)
{
UINT value = getValue();
logPost("Value is %d", value);
}
}
/* If printing to a terminal, use a low-priority LogTask: */
VOID LogTask(void* args)
{
(void)args;
while (1)
{
VOID* recvPtr=NULL;
if (kQueuePend(&logQ, &recvPtr, RK_WAIT_FOREVER) == RK_SUCCESS)
{
kassert(recvPtr!=NULL);
Log_t* logPtr = (Log_t*)recvPtr;
kprintf("%lu ms :: %s\r\n", logPtr->t, logPtr->s);
kMemFree(&qMem, recvPtr);
}
}
}
5.7.2. Stream Queue
Message Stream Control Block |
---|
Storage address |
Write Address |
Read Address |
Message Block Size |
Max of messages |
Message Count |
Waiting Queue |
Owner Task |
Streams resemble classic (named) Pipes. The difference is that messages have a fixed size. On the other hand, pipes transmit and receive any number of bytes for each operation.
For each Stream, the user provides a buffer address with enough capacity (number of messages x message size). Then, the kernel will handle it as a ring buffer.
The message size associated with a Message Stream instance is defined at its initialisation. On transmission, a message is _ deep copied _ from the sender’s storage to the queue; on reception, it moves from the queue to the receiver’s storage.
Although a message size is associated with a Stream Queue object, the concrete message type depends on the application. |
The important primitives for Message Streams are send()
, recv()
, jam()
and peek()
.
Sending to a full queue (optionally) blocks the sender. Likewise, receiving from an empty queue.
5.7.2.1. Stream Message-Size
Stream Queues must have fixed message-sizes multiples of a WORD. Besides, they must be a power-of-two: 1, 2, 4, 8, 16, 32… (words).
RK0 does not establish an upper bound, although I would say that a good cap is 4 words for the regular RK0 target. One has to experiment, though. If a message becomes too large, it introduces prohibitive latency, so the user needs to transmit the message address—i.e., configure the Stream to carry a 1-word message size.
-
Load/Store instructions are optimised to fetch 32-bit words. If message sizes are bounded on a 4-byte boundary, these operations can be executed in a single cycle.
-
A power-of-two constraint is a CPU-aware design choice to prevent unalignment issues.
-
Misaligned memory makes castings unsafe, leading to complex faults, performance penalties or undefined behaviour.
Deep Copies are usually needed for message passing but introduce significant overhead. Design choice: Be CPU-aware and constrain data size to power of two words. Benefits: speeds up the copy, achieves more deterministic behaviour, improves run-time safety. |
Code-wise, we optimise using pointer arithmetics on pointer to words:
/* Optimised deep copy; guaranteed mesgSize>0 */
/* destPtr and srcPtr are pointers to a word */
#define RK_CPY(destPtr, srcPtr, mesgSize) \
do { \
while (--mesgSize) \
{ \
/* if mesgSize is 1, this is NOT executed */
*(destPtr++) = *(srcPtr++) \
}; \
/* the last or the only copy is executed now */
*(destPtr++) = *(srcPtr++) \
} while(0U)
Stream Queues are a way less error-prone than Mail Queues, given the intensive pointer handling performed with Mail Queues. |
5.7.2.2. Usage Example: Averaging Sensor Values
Below is an illustrative snippet of a Queueing Pattern.
The goal is to calculate the average value of 4 types of sensors.
It is convenient to highlight an important aspect here: Given its reactive nature, real-time system software is typically I/O bounded. Tasks that are sensitive to I/O activity have higher priority than CPU-bounded tasks, i.e., those processing data.
A task receives measured sensor values from an ISR on a periodic rate. (A Soft Timer emulates the ISR).
Then it enqueues this data to a consumer - that will process the average value for each of 4 sensors.
The inter-task communication is designed as follows:
-
The producer pends on a Mailbox that an ISR posts to. An application timer emulates this ISR.
-
The data extracted from the Mailbox is placed in a queue with the processing task as the consumer.
-
As the producer’s priority must be higher than that of the consumer, eventually, the queue will get full.
-
The producer drops the last message when the queue is full and signals the consumer.
-
Now the consumer has a batch of data to work until the next sensor update. It will block (pend on a signal) whenever the queue is empty.
Here, the queue size was set at 8 items. This is an arbitrary value; the optimal queue size would take into account system edge cases: 'What is the state that represents the most inconvenient time to be interrupted by this sensor?'
#define kPend(timeout) \
do \
{ \
kSignalGet(0x1, RK_FLAGS_ANY, NULL, timeout); \
} while (0)
#define kSignal(taskhandle) \
do \
{ \
kSignalSet(taskhandle, 0x01); \
} while (0)
typedef enum
{
TEMPERATURE=1, HUMIDITY, CO2, FLOW
}SensorType_t;
/* sensor types */
struct sensorMsg
{
SensorType_t sensorType;
ULONG sensorValue;
};
typedef struct sensorMsg Mesg_t;
#define N_MESSAGE 8
#define MESSAGE_SIZE (sizeof(Mesg_t) + sizeof(ULONG) - 1)/(sizeof(ULONG) /* WORDS! */
#define N_SENSOR 4
#define AVG_WINDOW_SIZE 10 /* 10 samples */
RK_STREAM sensorStream;/* the stream kobject */
Mesg_t mesgBuf[N_MESSAGE] = {0};/* queue buffer */
RK_TIMER timerT1;
RK_MBOX sensorBox;
static Mesg_t sample = {0};
static UINT sampleErr;
VOID callBackISR(VOID* ARGS);
VOID kApplicationInit( VOID)
{
RK_ERR err = kStreamInit(&sensorStream, (VOID*) mesgBuf, MESSAGE_SIZE,
N_MESSAGE);
kassert(err==RK_SUCCESS);
/* timer @ every 10 ms */
err = kTimerInit(&timerT1, 0, 10, callBackISR, NULL, RK_TIMER_RELOAD);
kassert( err==RK_SUCCESS);
err = kMboxInit(&sensorBox, NULL);
kassert( err==RK_SUCCESS);
}
VOID callBackISR(VOID *args)
{
RK_UNUSEARGS
sample.sensorType = (rand() % 4) + 1;
switch (sample.sensorType)
{
case TEMPERATURE:
sample.sensorValue = ( ULONG) rand() % 50;
break;
case HUMIDITY:
sample.sensorValue = ( ULONG) rand() % 100;
break;
case CO2:
sample.sensorValue = ( ULONG) rand() % 1000;
break;
case FLOW:
sample.sensorValue = ( ULONG) rand() % 10;
break;
default:
break;
}
RK_ERR err = kMboxPost( &sensorBox, &sample, RK_NO_WAIT);
if (err != RK_SUCCESS)
sampleErr ++;
}
/* Producer - higher priority, blocks on mailbox */
VOID Task1(VOID *args)
{
RK_UNUSEARGS
Mesg_t *recvSample = NULL;
while (1)
{
RK_ERR errmbox = kMboxPend( &sensorBox, ( VOID**) &recvSample,
RK_WAIT_FOREVER);
kassert( errmbox==RK_SUCCESS);
RK_ERR err = kStreamSend( &sensorStream, &sample, RK_NO_WAIT);
kassert(err >= 0); /* either succesful or unsuccesful */
if (err == RK_SUCCESS)
{
CHAR const *sensorTypeStr = NULL;
if (recvSample->sensorType == 1)
sensorTypeStr = "TEMP";
if (recvSample->sensorType == 2)
sensorTypeStr = "HUM";
if (recvSample->sensorType == 3)
sensorTypeStr = "CO2";
if (recvSample->sensorType == 4)
sensorTypeStr = "FLOW";
printf( "ENQ: [@%lums, %s, %lu] \r\n", kTickGet(), sensorTypeStr,
recvSample->sensorValue);
}
else if (err == RK_ERR_STREAM_FULL)
{
kSignal(task2Handle);
}
}
}
/* for each sensor:
. a ring buffer of AVG_WINDOW_SIZE values
. sum of values
. an index table (=enum - 1 eg., HUMIDITY IDX=2-1=1)
*/
static ULONG ringBuf[N_SENSOR][AVG_WINDOW_SIZE];
static ULONG ringSum[N_SENSOR] = {0};
static UINT ringIndex[N_SENSOR] = {0};
void Task2( void *args)
{
RK_UNUSEARGS
Mesg_t readSample;
while (1)
{
RK_ERR err = kStreamRecv(&sensorStream, (VOID*)&readSample,
RK_NO_WAIT);
if (err == RK_SUCCESS)
{
UINT sensorIdx = readSample.sensorType - 1;
/* remove oldest sample */
ULONG oldest = ringBuf[sensorIdx][ringIndex[sensorIdx]];
ringSum[sensorIdx] -= oldest;
/* push new sample */
ringBuf[sensorIdx][ringIndex[sensorIdx]] = readSample.sensorValue;
ringSum[sensorIdx] += readSample.sensorValue;
/* index incr-wrap */
ringIndex[sensorIdx] ++;
ringIndex[sensorIdx] %= AVG_WINDOW_SIZE;
/* simple average */
ULONG avg = ringSum[sensorIdx] / AVG_WINDOW_SIZE;
CHAR const *sensorTypeStr = NULL;
if (readSample.sensorType == 1)
sensorTypeStr = "TEMP";
if (readSample.sensorType == 2)
sensorTypeStr = "HUM";
if (readSample.sensorType == 3)
sensorTypeStr = "CO2";
if (readSample.sensorType == 4)
sensorTypeStr = "FLOW";
printf( "DEQ: [@%lums, %s, %lu] | AVG: %lu \r\n", kTickGet(),
sensorTypeStr, readSample.sensorValue, avg);
}
else
{
kPend(RK_WAIT_FOREVER);
}
}
}
OUTPUT:
ENQ: [@550ms, CO2, 571]
ENQ: [@560ms, FLOW, 4]
ENQ: [@570ms, FLOW, 4]
ENQ: [@580ms, HUM, 25]
ENQ: [@590ms, CO2, 931]
ENQ: [@600ms, CO2, 487]
ENQ: [@610ms, FLOW, 7]
ENQ: [@620ms, HUM, 79]
>>> Queue is full. Now offload and process. Note the order remains <<<
DEQ: [@630ms, CO2, 571] | AVG: 460
DEQ: [@631ms, FLOW, 4] | AVG: 5
DEQ: [@632ms, FLOW, 4] | AVG: 5
DEQ: [@633ms, HUM, 25] | AVG: 52
DEQ: [@634ms, CO2, 931] | AVG: 553
DEQ: [@635ms, CO2, 487] | AVG: 549
DEQ: [@636ms, FLOW, 7] | AVG: 5
DEQ: [@637ms, HUM, 79] | AVG: 55
>>> Consumer is preempted <<<
ENQ: [@640ms, CO2, 913]
ENQ: [@650ms, CO2, 134]
ENQ: [@660ms, HUM, 47]
ENQ: [@670ms, HUM, 30]
ENQ: [@680ms, TEMP, 7]
ENQ: [@690ms, CO2, 726]
ENQ: [@700ms, FLOW, 7]
ENQ: [@710ms, TEMP, 43]
DEQ: [@720ms, CO2, 913] | AVG: 578
DEQ: [@721ms, CO2, 134] | AVG: 543
DEQ: [@722ms, HUM, 47] | AVG: 51
DEQ: [@723ms, HUM, 30] | AVG: 44
DEQ: [@724ms, TEMP, 7] | AVG: 20
DEQ: [@725ms, CO2, 726] | AVG: 592
DEQ: [@726ms, FLOW, 7] | AVG: 5
DEQ: [@727ms, TEMP, 43] | AVG: 23
5.7.3. Summing Up: Stream Queues vs Mail Queues
The table below summarises how Stream and Mail Queues differ:
Feature | Mail Queue (Pointer-Based) | Stream Queue (Deep Copy-Based) |
---|---|---|
Message Storage |
Stores pointers to messages |
Stores deep copies of messages |
Message Size |
4-byte. Typical use case is to transmit the address of a message. |
Fixed (defined at queue initialisation). |
Memory Management |
Typically used along with a partition pool, for zero-copy or one-copy message passing. |
Static buffer, N-words/message. N is a power of 2. |
Data Ownership |
Sender/receiver manage lifecycle |
Kernel. |
Performance |
A 'zero-copy' transmission is faster. |
Deterministic. Kernel Optimised deep-copy. |
Best Use Cases |
Work Queues, Client-Server with dynamic payload, any case where zero-copy or 1-copy is feasible |
Real-time data streaming (e.g., sensor pipelines, inter-device communication). |
5.7.4. Installing Notify Callbacks
Mailboxes, Mail and Stream Queues allow installing callbacks to notify send/receive operations.
A callback has the signature VOID cbk(<RK_(MBOX/QUEUE/STREAM> *)
.
5.7.4.1. Usage Example: Queue Select
It is not uncommon to have a gatekeeper or supervisor task listening to several queues at once.
In this snippet, a sendNotify
is installed on for each queue that signal the supervisor task. This task runs every 100ms coalescing about two post
of each mail queue.
/* Notify Callback on Queues */
/* each queue has registered this send callback */
VOID sendNotify(RK_QUEUE *qPtr)
{
UINT i = 0;
for (i = 0; i < 3; ++i)
{
if (queues[i] == qPtr)
{
ULONG qFlag = 1UL << i;
kSignalSet(superHandle, qFlag);
break;
}
}
}
/* tasks sending follow this pattern */
VOID Task2(VOID *args)
{
RK_UNUSEARGS
UINT num = 0x20;
while (1)
{
MESG_t *ptr;
ptr = (MESG_t *)kMemAlloc(&queueMem);
if (ptr)
{
ptr->num = num++;
ptr->senderID = RK_RUNNING_PID;
RK_ERR err = kQueuePost(&queue2, (VOID *)ptr, RK_NO_WAIT);
if (err != RK_SUCCESS)
{
kMemFree(&qMem, (VOID *)ptr);
kprintf("Q2 FULL\n\r");
}
}
kSleepPeriodic(50);
}
}
/* supervisor listening on 3 queues */
/*
although it has a higher period, it has also the highest priority, what is acceptable for supervisors
you see the tasks posting to queues are keeping its two succesfull posts the supervisor drains
every 100ms
*/
VOID SupervisorTask(VOID *args)
{ RK_UNUSEARGS
static ULONG gotFlags = 0UL;
while (1)
{
gotFlags = 0UL;
kSignalGet(0x7, RK_FLAGS_ANY, &gotFlags, RK_NO_WAIT);
UINT k = 0;
/* we drain each queue that is set */
for (k = 0; k < 3; ++k)
{
if (gotFlags & (1UL << k))
{
VOID *recvPtr = NULL;
while (
kQueuePend(queues[k], &recvPtr,
RK_NO_WAIT) == RK_SUCCESS)
{
MESG_t *m = (MESG_t*)recvPtr;
/*1-copy message passing */
UINT id = m->senderID;
UINT num = m->num;
kMemFree(&queueMem, recvPtr);
UINT sel = k + 1; /* bit position */
logPost(" sel: %d, senderID: %d, payload: 0x%02X", sel, id, num);
}
}
}
kSleepPeriodic(100);
}
}
OUTPUT:
36500 ms :: sel: 1, senderID: 5, payload: 0x2E8
36500 ms :: sel: 1, senderID: 5, payload: 0x2E9
36500 ms :: sel: 2, senderID: 3, payload: 0x2F8
36500 ms :: sel: 2, senderID: 3, payload: 0x2F9
36500 ms :: sel: 3, senderID: 4, payload: 0x308
36500 ms :: sel: 3, senderID: 4, payload: 0x309
36600 ms :: sel: 1, senderID: 5, payload: 0x2EA
36600 ms :: sel: 1, senderID: 5, payload: 0x2EB
36600 ms :: sel: 2, senderID: 3, payload: 0x2FA
36600 ms :: sel: 2, senderID: 3, payload: 0x2FB
36600 ms :: sel: 3, senderID: 4, payload: 0x30A
36600 ms :: sel: 3, senderID: 4, payload: 0x30B
36700 ms :: sel: 1, senderID: 5, payload: 0x2EC
36700 ms :: sel: 1, senderID: 5, payload: 0x2ED
36700 ms :: sel: 2, senderID: 3, payload: 0x2FC
36700 ms :: sel: 2, senderID: 3, payload: 0x2FD
36700 ms :: sel: 3, senderID: 4, payload: 0x30C
36700 ms :: sel: 3, senderID: 4, payload: 0x30D
36800 ms :: sel: 1, senderID: 5, payload: 0x2EE
36800 ms :: sel: 1, senderID: 5, payload: 0x2EF
36800 ms :: sel: 2, senderID: 3, payload: 0x2FE
36800 ms :: sel: 2, senderID: 3, payload: 0x2FF
36800 ms :: sel: 3, senderID: 4, payload: 0x30E
36800 ms :: sel: 3, senderID: 4, payload: 0x30F
36900 ms :: sel: 1, senderID: 5, payload: 0x2F0
36900 ms :: sel: 1, senderID: 5, payload: 0x2F1
36900 ms :: sel: 2, senderID: 3, payload: 0x300
5.8. Message Passing ownership
Using queues to communicate between multiple tasks is chaos. Many senders to many receivers can be unpredictable. We often want N:1 (senders:receiver, N can be 1). This 1 makes it easier to reason about the dynamics.
In real-time design, we often expect to see blocking send() operations on 1:1 or N:1 channels—a blocking send() on a 1:N (broadcast) would be very odd.
5.8.1. Priority Inversion on Message Passing
Priority Inversion happens on Message-Passing for similar but subtly different reasons from resource sharing. Design Choice: Add an ownership mechanism for a message-passing object—a well-defined receiver—so priority propagation can be applied. Benefit: This preserves strict real-time guarantees, making sure a high-priority task never waits indefinitely for a lower-priority task to finish message operations |
While sharing some similarities, there are subtle differences between blocking on a shared resource (by blocking on a locked mutex) and blocking on a message-passing object.
Assuming cases we do not want messages to be overwritten, a sender, when accessing a queue, is acquiring an empty buffer. A receiver is acquiring a full buffer. They are competing for the same object but in different states. Thus, they depend on each other to change the object state.
When a sender blocks on a full shared message object, it does not mean another writer is using the resource; By design it is also unlikely there is a reader blocked on the waiting queue of the object, since every time a write operation completes, any reader blocked on the queue is readied. Whether it is dispatched or not is a scheduler concern. If its priority is higher than the task just finished, it will be immediately dispatched. If not, it is enqueued in the ready queue until it is eventually picked up.
This means the priority inversion problem arises from waiting for the consumer rather than from direct contention among multiple senders. |
So, if the sender’s priority is higher, it could be propagated to the reader. But, which reader? (This is why semaphores cannot implement priority inheritance protocol — the waiter task cannot know a potential signaller).
With that in mind, there is the option to set ownership: set owner (mesgpass, task handle)
. From now on, only the owner task can receive from that service object—a blocking send()
knows the target task and can raise its priority.
(As 1:N communication normally non-blocking on real-time systems, there is no mechanism to establish 'sender ownership'.)
If another task that is not the owner tries to receive from a kernel message-passing object that has an owner, it fails.
These kernel objects now will resemble an aspect of Ports, a common way of representing tasks on message-passing kernels. (Strictly, they are not Ports, as RK0 is not a message-passing kernel—although I do like the approach.)
5.9. Most-Recent Message Protocol (MRM)
MRM Control Block |
---|
MRM Buffer Allocator |
Data Buffer Allocator |
Current MRM Buffer Address |
Data Size (Message Size) |
MRM Buffer |
---|
Data Buffer Address |
Readers Count |
Data Buffer |
---|
Application-dependent |
There is little practical difference between a message that does not arrive and one with no valid (stale) data. But when wrong (or stale) data is processed - e.g., to define a set point on a loop - a system can fail badly. Design Choice: provide a broadcast asynchronous message-passing scheme that guarantees data freshness and integrity for all readers. Benefits: The system has a mechanism to meet strict deadlines that cannot be predicted on design time. |
Control loops reacting to unpredictable time events—like a robot scanning an environment or a drive-by-wire system—require a different message-passing approach. Readers cannot "look at the past" and cannot block. The most recent data must be delivered lock-free and have guaranteed integrity.
5.9.1. Functional Description
An MRM works as a 1-to-many asynchronous Mailbox - with a lock-free specialisation that enables several readers to get the most recent deposited message with no integrity issues. Whenever a reader reads an MRM buffer, it will find the most recent data transmitted. It can also be seen as an extension of the Double Buffer pattern for a 1:N communication.
The core idea of the MRM protocol is that readers can only access the buffer that is classified as the 'most recent buffer'. After a writer publish() a message, that will be the only message readers can get() — any former message being processed by a reader was grabbed before a new publish() - and, from now on, can only be unget(), eventually returning to the pool.
To clarify further, the communication steps are listed:
-
A producer first reserves an MRM Buffer - the reserved MRM Buffer is not available for reading until it is published.
-
A message buffer is allocated and filled, and its address is within an MRM Buffer. The producer publishes the message. From now on, it is the most recent message. Any former published buffer is no longer visible to new readers
-
A reader starts by getting an MRM Buffer. A
get()
operation delivers a copy of the message to the reader’s scope. Importantly, this operation increases the number of readers associated to that MRM Buffer.
Before ending its cycle, the task releases (unget()
) the buffer; on releasing, the kernel checks if the caller task is the last reader and if the buffer being released is not the current MRM Buffer.
If the above conditions are met, the unget()
operation will return the MRM buffer to the pool. If there are more readers, OR if it is the current buffer, it remains available.
When the reserve
operation detects that the most recent buffer still has readers, a new buffer is allocated to be written and published. If it has no readers, it is reused.
This way, the worst case is a sequence of publish()
with no unget()
at all — this would lead to the writer finding no buffer to reserve. This is prevented by making: N Buffers = N tasks + 1
.
5.9.2. MRM Control Block Configuration
What might lead to some confusion when initialising an MRM Control Block is the need for two different pools:
-
One pool will be the storage for the MRM Buffers, which is the data structure for the mechanism.
-
Another pool is for the actual payload. The messages.
Both pools must have the same number of elements: the number of tasks communicating + 1.
-
The size of the data buffers is application-dependent - and is passed as a number of words. The minimal message size is 32-bit.
-
If using data structures, keep it aligned to 4 to take advantage of the performance of aligned memory.
5.9.3. Usage Example
Consider a modern car - speed variations are of interest in many modules. With a somehow "naive" approach, let us consider three modules and how they should react when speed varies:
-
Cruiser Control: For the Cruiser Control, a speed increase might signify the driver wants manual control back, and it will likely turn off.
-
Windshield Wipers: If they are on, a speed change can affect the electric motor’s adjustments to the air resistance.
-
Radio: Speed changes reflect the aerodynamic noise - the radio volume might need adjustment.
As the variations are unpredictable, we need a mechanism to deliver the last speed in order of importance for all these modules. From highest to lowest priority, Cruise, Wipers, and Radio are the three modules that range from safety to comfort.
To emulate this scenario, we can write an application with a higher priority task that sleeps and wakes up at pseudo-random times to produce random values that represent the (unpredictable) speed changes.
The snippet below has 4 periodic tasks. Tasks sleep for absolute periods. The producer publishes new data at a random interval, so it can either interrupt before one has the chance to finish or be inactive while it runs more than once.
typedef struct
{
UINT speed;
RK_TICK timeStamp;
} Mesg_t;
#define N_MRM (5) /* Number of MRMs N Tasks + 1 */
#define MRM_MESG_SIZE (sizeof(Mesg_t)/4) /* In WORDS */
RK_MRM MRMCtl;/* MRM control block */
RK_MRM_BUF buf[N_MRM];/* MRM pool */
Mesg_t data[N_MRM];/* message data pool */
VOID kApplicationInit( VOID)
{
kCreateTask(&speedSensorHandle, SpeedSensorTask, RK_NO_ARGS, "SpeedTsk", stack1, STACKSIZE, 1, RK_PREEMPT);
kCreateTask(&cruiserHandle, CruiserTask, RK_NO_ARGS, "CruiserTsk", stack2, STACKSIZE, 2, RK_PREEMPT);
kCreateTask(&wiperHandle, WiperTask, RK_NO_ARGS, "WiperTsk", stack3, STACKSIZE, 3, RK_PREEMPT);
kCreateTask(&radioHandle, RadioTask, RK_NO_ARGS, "RadioTsk", stack4, STACKSIZE, 4, RK_PREEMPT);
kMRMInit( &MRMCtl, buf, data, N_MRM, MRM_MESG_SIZE);
}
VOID SpeedSensorTask( VOID *args)
{
RK_UNUSEARGS
Mesg_t sendMesg = {0};
while (1)
{
RK_TICK currTick = kTickGet();
UINT speedValue = (UINT) (rand() % 170) + 1;
sendMesg.speed = speedValue;
sendMesg.timeStamp = currTick;
/* grab a buffer */
RK_MRM_BUF *bufPtr = kMRMReserve( &MRMCtl);
if (bufPtr != NULL)
{
kMRMPublish( &MRMCtl, bufPtr, &sendMesg);
}
else
{/* cannot fail */
kassert( 0);
}
/* publish */
printf( "! @ %dT: SPEED UPDATE: %u \r\n", currTick, speedValue);
RK_TICK sleepTicks = (( RK_TICK) rand() % 15) + 1;
kSleepPeriodic( sleepTicks);
}
}
VOID CruiserTask( VOID *args)
{
RK_UNUSEARGS
Mesg_t recvMesg = {0};
while (1)
{
RK_MRM_BUF *readBufPtr = kMRMGet( &MRMCtl, &recvMesg);
printf( "@ %dT CRUISER: (%u, %uT) \r\n", kTickGet(), recvMesg.speed, recvMesg.timeStamp);
kMRMUnget( &MRMCtl, readBufPtr);
kSleepPeriodic( 4);
}
}
VOID WiperTask( VOID *args)
{
RK_UNUSEARGS
Mesg_t recvMesg = {0};
while (1)
{
RK_MRM_BUF *readBufPtr = kMRMGet( &MRMCtl, &recvMesg);
printf( "@ %dT WIPERS: (%u, %uT) \r\n", kTickGet(), recvMesg.speed, recvMesg.timeStamp);
kMRMUnget( &MRMCtl, readBufPtr);
kSleepPeriodic( 8);
}
}
VOID RadioTask( VOID *args)
{
RK_UNUSEARGS
Mesg_t recvMesg = {0};
while (1)
{
RK_MRM_BUF *readBufPtr = kMRMGet( &MRMCtl, &recvMesg);
printf( "@ %dT RADIO: (%u, %uT) \r\n", kTickGet(), recvMesg.speed, recvMesg.timeStamp);
kMRMUnget( &MRMCtl, readBufPtr);
kSleepPeriodic(12);
}
}
Thus, different situations can happen:
-
All tasks read the updated pair (speed, time)
-
Not all tasks receive the updated pair because another update happens in between.
-
No tasks receive an update - because another happens too soon.
-
All tasks receive an update and will keep rereading the same values.
All these cases are on the image:

The highlight is that controllers can keep their pace, while receiving fresh data - you can see it on the timestamp on the image. Again, they might receive the same data more than once or miss samples; what is important is that they are not lagging and consuming stale data. |
6. Error Handling
6.1. Fail fast
While tracing and error handling are yet to be largely improved (and that is when the 1.0.0 version will be released), currently RK0 employs a policy of failing fast in debug mode.
When Error Checking is enabled, every kernel call will be 'defensive', checking for correctness of parameters and invariants, null dereferences, etc.
In these cases is more useful to allow the first error to halt the execution by calling an Error Handler function to observe the program state.
A trace structure records the address of the running TCB, its current stack pointer, the link register (that is, the PC at kErrHandler was called), and a time stamp.
This record is on a .noinit
RAM section, so it is visible if CPU resets. A fault code is stored in a global faultID
and on the trace structure.
Developers can hook in custom behaviour.
If the kernel is configured to not halt on a fault, but Error Checking is enabled, functions will return negative values in case of an error.
On the other hand, when Error Checking is disabled or NDEBUG
is defined nothing is checked, reducing code size and improving performance.
(Some deeper internal calls have assertion. For those, only NDEBUG
defined ensures they are disabled.)
6.2. Stack Overflow
Stack overflow is detected (not prevented) using a "stack painting" with a sentinel word. Stack Overflow detection is enabled by defining the assembler preprocessor __K_DEF_STACKOVFLW
when compiling.
One can take advantage of the static task model - it is possible to predict offline the deepest call within any task. The -fstack-usage compiler
flag will create .su
files indicating the depth of every function within a module. This is an example for the results of a compilation unit:
core/src/ksynch.c:61:8:kSignalGet 112 static
core/src/ksynch.c:163:8:kSignalSet 72 static
core/src/ksynch.c:214:8:kSignalClear 40 static
core/src/ksynch.c:237:8:kSignalQuery 56 static
core/src/ksynch.c:260:8:kEventInit 40 static
core/src/ksynch.c:279:8:kEventSleep 112 static
core/src/ksynch.c:343:8:kEventWake 88 static
core/src/ksynch.c:383:8:kEventSignal 64 static
core/src/ksynch.c:412:7:kEventQuery 16 static
core/src/ksynch.c:429:8:kSemaInit 88 static
core/src/ksynch.c:470:8:kSemaPend 96 static
core/src/ksynch.c:555:8:kSemaPost 88 static
core/src/ksynch.c:603:8:kSemaWake 72 static
core/src/ksynch.c:640:5:kSemaQuery 40 static
core/src/ksynch.c:662:8:kMutexInit 16 static
core/src/ksynch.c:681:8:kMutexLock 120 static
core/src/ksynch.c:761:8:kMutexUnlock 96 static
core/src/ksynch.c:834:6:kMutexQuery 56 static
These are the worst cases. Now, you identify the depth of the longest chain of calls for a task using these services and add a generous safety margin — 30%. The cap depends on your budget.
Importantly, you also have to size the System Stack. This initial size is defined on linker.ld
and on the symbol Min_Stack_Size
. In this case, you need to account for the depth of main()
, kApplicationInit()
, and all interrupt handlers — again, inspect the the longest call chain depth. Assume interrupts will always add to the worst static depth, and make sure to account for nested interrupts.
6.3. Deadlocks
There are deadlock recovering mechanisms in the literature, a pity they are unfeasible here. The kernel provides bounded waiting, enforces every waiting queue to priority discipline, and applies priority inheritance to mutexes and message passing. Besides, it provides lock-free primitives and compensates for time drifting if a period is enforced on tasks. Well, none of these techniques can prevent deadlocks (right, with bounded blocking and lock free primitives one can get livelocks).
-
Ordered Locking:
For those programming the application, despite following the RMS rule of higher priority for higher request rate tasks, the golden rule for locking is acquiring resources in an unidirectional order throughout the entire application:
acquire(A);
acquire(B);
acquire(C);
.
.
.
release(C);
release(B);
release(A);
This breaks circular waiting.
For instance:
TaskA:
wait(R1);
wait(R2);
/* critical section */
signal(R2);
signal(R1);
TaskB:
wait(R1);
wait(R2);
/* critical section */
signal(R2);
signal(R1);
But, if:
TaskA:
wait(R1);
wait(R2);
.
.
TaskB:
wait(R2);
wait(R1);
.
.
There are some possible outcomes:
-
Deadlock:
-
TaskA runs: acquires R1
-
TaskB runs: acquires R2
-
TaskA runs: tries to acquire R2 — blocked
-
TaskB runs: tries to acquire R1 — blocked
-
-
No deadlock:
-
TaskA runs: acquires R1
-
TaskA runs: acquires R2 (nobody is holding R2)
-
TaskA releases both; TaskB runs and acquires both (in either order)
-
Overall, there is no deadlock if tasks do not overlap in critical sections. That is why systems run for years without deadlocks and eventually: ploft.
-
Use a 'master-lock' with 'try' semantics:
Another technique that can be employed is if one needs to acquire multiple locks—acquire them all or none using a try-lock (RK_NO_WAIT
). If any of the tries fail, the task gives up on acquiring the resources and backs off, releasing all successful locks to retry later (most simply, using a sleep queue). That is easier said than done, though, and, as mentioned, if not well done, instead of deadlocks, one gets livelocks.
(Livelocks are when a couple of tasks keep running, but the program does not advance.)
7. RK0 Services API
Convention
-
A kernel call starts with a lowercase
k
. Typically it is followed by a kernel object identifier and an action.
kSemaPend(&sema, 800); /* pend on a semaphore; 800 ticks time-out */
-
When
k
is followed by an action, it is acting on the caller task.
kSleep(150); /* sleep-delay the caller task for 150 ticks */
-
Some calls can act either on the caller or on another task:
/* stores the signal flags of the task identified by task1Handle on queryValue */
kSignalQuery(task1Handle, &queryValue);
/* retrieves its own signal flags */
kSignalQuery(NULL, &queryValue);
Return Values
With a few exceptions, kernel calls return a RK_ERR
error code. 0
is a successful operation (RK_SUCCCESS)
and any negative value is an error that indicates failure.
A positive value is an unsuccesful operation, but will not lead the system to failure (e.g., any unsuccesful try
operation).
-
Find the most up to date RK0 API at the repo: RK0 API
8. Scheduler Determinism
8.1. Preemptive Scheduling
This is a simple test to establish some evidence the scheduler obeys the pre-emption criteria: a higher priority task always pre-empts a lower priority task.
8.1.1. Using Direct Signals
Tasks 1, 2, 3, and 4 are in descending order of priority. If the scheduler is well-behaved, we shall see counters differing by "1."
VOID Task1(VOID* args)
{
RK_UNUSEARGS
while(1)
{
counter1++;
kPend(RK_WAIT_FOREVER);
}
}
VOID Task2(VOID* args)
{
RK_UNUSEARGS
while(1)
{
kSignal(task1Handle); /* shall immediately be preempted by task1 */
counter2++;
kPend(RK_WAIT_FOREVER); /* suspends again */
}
}
VOID Task3(VOID* args)
{
RK_UNUSEARGS
while(1)
{
kSignal(task2Handle); /* shall immediately be preempted by task2 */
counter3++;
kPend(RK_WAIT_FOREVER); /* suspends again */
}
}
VOID Task4(VOID* args)
{
RK_UNUSEARGS
while(1)
{
/* shall immediately be preempted by task3 */
kSignal(task3Handle); /
/* only resumes after all tasks are pending again */
counter4++;
}
}
This is the output after some time running:

8.1.2. Using Semaphores
RK_SEMA sema1;
RK_SEMA sema2;
RK_SEMA sema3;
RK_SEMA sema4;
VOID kApplicationInit(VOID)
{
kSemaInit(&sema1, RK_SEMA_COUNT, 0);
kSemaInit(&sema2, RK_SEMA_COUNT, 0);
kSemaInit(&sema3, RK_SEMA_COUNT, 0);
kSemaInit(&sema4, RK_SEMA_COUNT, 0);
}
VOID Task1(VOID* args)
{
RK_UNUSEARGS
while (1)
{
counter1++;
kSemaWait(&sema1, RK_WAIT_FOREVER);
}
}
VOID Task2(VOID* args)
{
RK_UNUSEARGS
while (1)
{
kSemaSignal(&sema1);
counter2++;
kSemaWait(&sema2, RK_WAIT_FOREVER);
}
}
VOID Task3(VOID* args)
{
RK_UNUSEARGS
while (1)
{
kSemaSignal(&sema2);
counter3++;
kSemaWait(&sema3, RK_WAIT_FOREVER);
}
}
VOID Task4(VOID* args)
{
RK_UNUSEARGS
while (1)
{
kSemaSignal(&sema3);
counter4++;
}
}

Here, the tick is running @ 0.5us
8.1.3. Using Mailboxes
/**************************************************
* Mailboxes are initialised empty.
* kMboxInit(&mbox1, NULL);
* kMboxInit(&mbox2, NULL);
* kMboxInit(&mbox3, NULL);
* kMboxInit(&mbox4, NULL);
* kMboxInit(&mbox5, NULL);
*
* Highest Prio (Task1), Lowest (Task5)
*
* Using Mailboxes to pass tokens as signals.
*
*********************************************** ***/
VOID Task1( VOID *args)
{
RK_UNUSEARGS
UINT *p;
while (1)
{
counter1++;
kMboxPend(&mbox1, (VOID*) &p, RK_WAIT_FOREVER);
}
}
VOID Task2( VOID *args)
{
RK_UNUSEARGS
UINT mesg = 1;
UINT *p;
while (1)
{
kMboxPost(&mbox1, &mesg, RK_WAIT_FOREVER);
counter2++;
kMboxPend(&mbox2, (VOID*) &p, RK_WAIT_FOREVER);
}
}
VOID Task3( VOID *args)
{
RK_UNUSEARGS
UINT mesg = 1;
UINT *p;
while (1)
{
kMboxPost(&mbox2, &mesg, RK_WAIT_FOREVER);
counter3++;
kMboxPend(&mbox3, (VOID*) &p, RK_WAIT_FOREVER);
}
}
VOID Task4( VOID *args)
{
RK_UNUSEARGS
UINT mesg = 1;
UINT *p;
while (1)
{
kMboxPost(&mbox3, &mesg, RK_WAIT_FOREVER);
counter4++;
kMboxPend(&mbox4, (VOID*) &p, RK_WAIT_FOREVER);
}
}
VOID Task5( VOID *args)
{
RK_UNUSEARGS
UINT mesg=1;
while(1)
{
kMboxPost(&mbox4, &mesg, RK_WAIT_FOREVER);
counter5++;
}
}

9. Influences
For the sake of intellectual honesty and acknowledgment I will list the systems that have inspired RK0 (still incipient) design:
-
Pure 'RTOSish' things heavily influenced by: Nucleus, ThreadX and uCOS/OS.
-
The MRM Protocol is inspired by HARTIK RTOS.
-
Doubly Linked Lists and Delta-Lists are inspired by Linux 2.6.x and 4.4BSD, respectively.

© 2025 Antonio Giacomelli | All Rights Reserved | www.kernel0.org