1. THE KERNEL AT A GLANCE

1.1. The design approach

RK0 is the result of independent design, a lot of research plus (a whole other lot of) learning by doing.

While I may not have seen every edge case, building it from scratch has given me a unique perspective. I invite peers to review and challenge what’s been done so far.

In the embedded realm, probably because we lack a better abstraction, we use multithreading to fine-tune our load balance and responsiveness to achieve real-time.

This is an arrangement: instead of having a single super-loop, we have many, each running on its execution stack.

This arrangement yields an operating system entity to handle—a (logical) Concurrency Unit: in K0, we name it a Task (in RK0 a task is a thread.).

1.1.1. Architecture

If no more details are to be provided, the kernel has a top and a bottom layer. On top of that, the executive manages the resources needed by the application. On the bottom, the Low-level Scheduler works as a software extension of the CPU. Together, they implement the Task abstraction — the Concurrency Unit that enables a multitasking environment.

layeredkernel

In systems design jargon, the Executive enforces policy (what should happen). The Low-level Scheduler provides the mechanism (how it gets done). The services are the primitives that gradually translate policy decisions into concrete actions executed by the Scheduler. K0’s goal is determinism on low-end devices. Its multitasking engine operates without mimics of userland: tasks run in privileged mode on a different stack pointer from the system stack.

1.1.2. Suitable Applications

Given the architecture, RK0 targets applications with the following characteristics:

  1. They are designed to handle particular devices in which real-time responsiveness is imperative.

  2. Applications and middleware may be implemented alongside appropriate drivers.

  3. Drivers may even include the application itself.

  4. Untested programs are not loaded: After the software has been tested, it can be assumed reliable.

2. Core Mechanisms

This section provides a high-level description of the Core Mechanisms: Scheduler, Timers and Memory Allocator.

2.1. Scheduler

RK0 employs a Rate Monotonic Scheduler. Tasks are assigned priorities according to their request rates - i.e., tasks with shorter periods are assigned to higher priorities. The highest priority is represented by the value '0'; the lowest is represented by the value '31'.

A scheduler remark is its constant time complexity (O(1)) and low latency. This was achieved by carefully composing the data structures along with an efficient 'choose-next' algorithm. This is detailed below.

Time-slice was deprecated on version 0.5.0.

2.1.1. Scheduler Data Structures

2.1.1.1. Task Control Block

Threads are represented as Tasks. Every task is associated with a Task Control Block structure, which is a record for stack, resources, and time management. The table below partially represents a Task Control Block (as this document is live, this might not reflect the exact fields of the current version).

Task Control Block

Task name

Saved Stack Pointer

Stack Address

Stack Size

Status

Assigned Priority

Current Priority

Self-Assigned ID

Last wake-time

Run-To-Completion Flag

Time-out Flag

List of owned Mutexes

Aggregated Timeout Node

Aggregated Task List Node

Tasks are static - they cannot be created on runtime, to be destroyed, to fork or to join.

In practice, tasks are either RUNNING or 'waiting' for their turn to run. We need to define WAITING and _READY _ clearly.

A READY task will be dispatched; therefore, switch to RUNNING _ whenever it is the highest priority _READY task.

  1. A WAITING task depends on a condition, generalised as an event to switch to READY.

taskstates

Logically, the WAITING state will assume different pseudo-states related to the kind of event that will switch a task to READY:

  • SLEEPING: a task suspends itself and goes to sleep for a given period or suspends itself until receiving a wake signal, representing an event.

  • PENDING: the task suspended itself, waiting for a combination of signal flags.

  • BLOCKED: A task is blocked on a mutex or semaphore.

  • SENDING/RECEIVING: A producer task, when blocking on a Message Passing object, switches its status to SENDING, and a consumer to RECEIVING.

The scheduler rules, not the heap.

RK0 tasks are static.

It’s a design decision rooted in real-time correctness.

Stacks are defined and passed explicitly when creating a task.

The wins:

  • A memory layout the systems programmer knows.

  • No alignment traps.

  • Link-time visibility:

    • Each task’s stack is a named symbol in the linker map.

    • You can inspect and verify the memory layout before flashing.

    • A simple objdump reveals all stack allocations — that’s peace of mind.

schdatastruct
2.1.1.2. Task Queues

The backbone of the queues where tasks will wait for their turn to run is a circular doubly linked list: removing any item from a double list takes O(1) (provided we don’t need to search the item). As the kernel knows each task’s address, adding and removing is always O(1). Singly linked lists can’t achieve O(1) for removal.

2.1.1.3. Ready Queue Table

Another design choice to achieve O(1) is the global ready queue, which is a table of FIFO queues—each queue dedicated to a priority—and not a single ordered queue. So, enqueuing a ready task is always O(1). Given the sorting needed, the time complexity would be O(n) if tasks were placed on a single ready queue.

2.1.1.4. Waiting Queues

The scheduler does not have a unique waiting queue. Every kernel object that can block a task has an associated waiting queue. Because these queues are a scheduler component, they follow a priority discipline: the highest priority task is dequeued first, always.

When an event capable of switching tasks from WAITING to READY happens, one or more tasks (depending on the mechanism) are then placed on the ready list, unique to their priority. Now, they are waiting to be picked by the scheduler—that is the definition of _ READY _.

2.1.2. The scheduling algorithm

As the ready queue table is indexed by priority - the index 0 points to the queue of ready tasks with priority 0, and so forth, and there are 32 possible priorities - a 32-bit integer can represent the state of the ready queue table. It is a BITMAP:

The BITMAP computation: ((1a) OR (1b)) AND (2), s.t.:

(1a) Every Time a task is readied, update: BITMAP |= (1U << task->priority );
(1b) Every Time an empty READY QUEUE becomes non-empty, update: BITMAP |= (1U << queueIndex)
(2): Every Time READY QUEUE becomes empty, update: BITMAP &= ~(1U << queueIndex);
EXAMPLE:

  Ready Queue Index :     (6)5 4 3 2 1 0
          Not empty :      1 1 1 0 0 1 0
                           ------------->
                 (LOW)  Effective Priority  (HIGH)
In this case, the scenario is a system with 7 priority task levels. Queues with priorities 6, 5, 4, and 1 are not empty.

In the RK0 source code, the following routines implement the bitmap update:

/* Enqueue a TCB on on the tail of TCB list  */
RK_ERR kTCBQEnq(RK_TCBQ *const kobj, RK_TCB *const tcbPtr)
{
    RK_ERR err = kListAddTail(kobj, &(tcbPtr->tcbNode));
    if (err == 0)
    {
        /* if a task was enqueued on a list within the ready queue table, update the 'ready bitmap' */
        if (kobj == &readyQueue[tcbPtr->priority])
        {
            readyQBitMask |= (1 << tcbPtr->priority);
        }
    }
    return (err);
}

/* Add a TCB on the head of the TCB list  */
RK_ERR kTCBQJam(RK_TCBQ *const kobj, RK_TCB *const tcbPtr)
{
	RK_ERR err = kListAddHead(kobj, &(tcbPtr->tcbNode));
	if (err == 0)
	{
		if (kobj == &readyQueue[tcbPtr->priority])
		{
			readyQBitMask |= (1 << tcbPtr->priority);
		}
	}
	return (err);
}

/* Dequeue the head task from a list of TCBs */
RK_ERR kTCBQDeq(RK_TCBQ *const kobj, RK_TCB **const tcbPPtr)
{
    RK_NODE *dequeuedNodePtr = NULL;
    RK_ERR err = kListRemoveHead(kobj, &dequeuedNodePtr);
    if (err != RK_SUCCESS)
    {
        return (err);
    }
    *tcbPPtr = K_GET_TCB_ADDR(dequeuedNodePtr, RK_TCB);

    RK_TCB *tcbPtr_ = *tcbPPtr;
    RK_PRIO prio_ = tcbPtr_->priority;

    /* if the list is in the ready queue table and is now empty
     update 'ready bitmap' */
    if ((kobj == &readyQueue[prio_]) && (kobj->size == 0))
    {
        readyQBitMask &= ~(1U << prio_);
    }
    return (err);
}

/* Remove a specific TCB from a TCB List */
RK_ERR kTCBQRem(RK_TCBQ *const kobj, RK_TCB **const tcbPPtr)
{
    RK_NODE *dequeuedNodePtr = &((*tcbPPtr)->tcbNode);
    RK_ERR err = kListRemove(kobj, dequeuedNodePtr);
    if (err != RK_SUCCESS)
    {
        return (err);
    }
    *tcbPPtr = K_GET_TCB_ADDR(dequeuedNodePtr, RK_TCB);

    RK_TCB *tcbPtr_ = *tcbPPtr;
    RK_PRIO prio_ = tcbPtr_->priority;
    if ((kobj == &readyQueue[prio_]) && (kobj->size == 0))
    {
          readyQBitMask &= ~(1U << prio_);
    }
    return (err);
}

Having the Ready Queue Table bitmap, we find the highest priority non-empty task list as follows:

(1) Isolate the rightmost '1':

RBITMAP = BITMAP & -BITMAP. (- is the bitwise operator for two's complement: ~BITMAP + 1) `

In this case:

                           [31]       [0]  :  Bit Position
                             0...1110010   :  BITMAP
                             1...0001110   : -BITMAP
                            =============
                             0...0000010   :  RBITMAP
                                     [1]

The rationale here is that, for a number N, its 2’s complement -N, flips all bits - except the rightmost '1' (by adding '1') . Then, N & -N results in a word with all 0-bits except for the less significant '1'.

(2) Extract the rightmost '1' position:

  • For ARMv7M, we benefit from the CLZ instruction to count the leading zeroes. As they are the number of zeroes on the left of the rightmost bit, '1', this value is subtracted from 31 to find the Ready Queue index.

__RK_INLINE static inline
unsigned __getReadyPrio(unsigned readyQBitmap)
{
    unsigned ret;
    __ASM volatile (
        "clz    %0, %1     \n"
        "neg    %0, %0     \n"
        "add    %0, %0, #31\n"
        : "=&r" (ret)
        : "r" (readyQBitmap)
        :
    );
    return (ret);
}

This instruction would return #30, and #31 - #30 = #01 in the example above.

  • For ARMv6M there is no suitable hardware instruction. The algorithm is written in C and counts the trailing zeroes, thus, the index number. Although it might vary depending on your compiler settings, it takes ~11 cycles (note it is still O(1)):

/*
  De Brujin's multiply+LUT
  (Hacker's Delight book)
*/

/* table is on a ram section  for efficiency */
__K_SECTION(getReadyTable)
const static unsigned table[32] =
{
 0, 1, 28, 2, 29, 14, 24, 3, 30, 22, 20, 15, 25, 17, 4, 8,
 31, 27, 13, 23, 21, 19, 16, 7, 26, 12, 18, 6, 11, 5, 10, 9
};

__RK_INLINE static inline
unsigned __getReadyPrio(unsigned readyQBitmap)
{
    unsigned mult = readyQBitmap * 0x077CB531U;

    /* Shift right the top 5 bits
     */
    unsigned idx = (mult >> 27);

    /* LUT */
    unsigned ret = (unsigned)table[idx];
    return (ret);
}

For the example above, mult = 0x2 * 0x077CB531 = 0x0EF96A62. The 5 leftmost bits (the index) are 00001table[1] = 1.

During a context switch, the procedures to find the highest priority non-empty ready queue table index are as follows:

static inline RK_PRIO kCalcNextTaskPrio_(VOID)
{
    if (readyQBitMask == 0U)
    {
        return (idleTaskPrio);
    }
    readyQRightMask = readyQBitMask & -readyQBitMask;
    RK_PRIO prioVal = (RK_PRIO) (__getReadyPrio(readyQRightMask));
    return (prioVal);
}

VOID kSchSwtch(VOID)
{
    /* O(1) complexity */
	nextTaskPrio = kCalcNextTaskPrio_();

	RK_TCB* nextRunPtr = NULL;

	/* O(1) complexity */
	kTCBQDeq(&readyQueue[nextTaskPrio], &nextRunPtr);

	runPtr = nextRunPtr;

}

2.1.3. System Tasks

There are two System Tasks: the Idle Task and the Timer Handler Task.

The Idle Task runs whenever there is no other ready task to be dispatched. The CPU enters on low-power. The kernel assigns the Idle Task priority during initialisation, taking into account all priorities the user has defined. Unless user tasks occupy all 32 priorities, the Idle Task is treated as an ordinary lowest priority and has a position in the ready queue table. Otherwise, it is selected if Ready Queue Bitmap is 0x00000000.

The Timer Handler Task is dispatched whenever an Application Timer expires to run a Callout function. Nominally its priority is 0, but on practice it could be considered as having priority -1, because it always takes precedence over other tasks with priority 0.

2.1.4. Handling the scheduler

An essential characteristic of the scheduler is that it is a preemptive run-to-completion scheduler. This term, 'run-to-completion' has slightly different meanings depending on the context. It is often related to strictly cooperative schedulers, in the sense tasks must yield the processor. Otherwise, they monopolise the CPU.

In RK0, tasks with the same priority will run on a First-In-First-Out discipline. This is different from schedulers that employ a time-slice or a quantum for tasks, so they are forced to yield after a period and are put at the tail of the Ready Queue.

The term run-to-completion here is to be interpreted as follows:

  • The scheduler’s behaviour is to choose the highest priority READY task to run. Always.

  • The scheduler works on a First-In-First-Out discipline for tasks with the same priority.

  • A task must switch to the READY state before being eligible for scheduling.

  • A task will switch from RUNNING to READY if yielding or if being preempted by a higher priority task.

  • Otherwise it can only go to a WAITING state, and eventually switch back to READY.

  • When a task is preempted, it switches from RUNNING to READY and is placed back on the head position of its Ready Queue. This means that it will be resumed as soon as it is the highest priority ready task again.

  • On the contrary, if a task yields, it tells the scheduler it has completed its cycle. Then, it will be enqueued on the ready queue tail - the last queue position.

  • When a task waits it is suspended until a condition is satisfied.

  • When the condition is satisfied, it switches from WAITING to READY, and is enqueued on the tail.

  • So, tasks with the same priority will round-robin as long as they yield or wait.

  • If a task never yields or waits, other tasks with the same or lower priority will starve.

  • Finally, Tasks with the same priority are initially placed on the Ready Queue associated with that priority in the order they are created.

RK0 can handle context-switching with an extended frame when a float-point co-processor is available. This must be informed when compiling by defining the symbol __FPU_PRESENT=1.

2.2. Timers

Context switching is probably the most significant overhead on a kernel. The time spent on the System Tick handler contributes to much of this overhead.

Design Choice:

  • Timers are kept on a single list; only the head element needs to be updated using a delta-queue approach.

  • Application Timers that trigger callbacks are run on a deferred, non-preemptible system task.

Benefits:

  • Keep the overhead of updating timers as minimal as possible with the delta queue;

  • Deferring the Application Timer to a high-priority, non-preemptible system task meet the requested callback period while keeping the ability to track system ticks.

Timeout Node

Timeout Type

Absolute Interval (Ticks)

Relative Interval (Ticks)

Waiting Queue Address

Next Timeout Node

Previous Timeout Node

Every task is prone to events triggered by timers described in this section. Every Task Control Block has a node to a timeout list.

This list is doubly linked, ordered as a delta list. For instance, three timers (T1,8), (T2,6) and (T3,10) will be ordered as a sequence <(T2,6), (T1,2), (T3,2)> - so it counts <6, (6)+2, ((6)+2)+2>.

Thus, for every system tick, only the head element on the list needs to be decreased - yielding O(1) - another design choice towards deterministic behaviour.

2.2.1. Sleep Timers

  • kSleep(ticks) It suspends the calling task immediately. When readied, the task will be dispatched once it is the higher priority task, and it will be suspended for the same number of ticks, no matter how many ticks have elapsed between one activation and another.

  • kSleepUntil(absolute_ticks) It suspends the calling task and adjusts the sleeping time at _every activation_to compensate for time drifts.

2.2.2. Blocking Time-out

These are timers associated with kernel calls that are blocking. Thus, establishing an upper-bound waiting time might benefit them. When the time for unblocking is up, the kernel call returns, indicating a timeout. When blocking is associated with a kernel object (other than the Task Control Block), the timeout node will store the object waiting for queue’s address, so it can be removed if time expires.

2.2.3. Callout Timers (Application Timers)

Timer Control Block

Option: Reload/One-Shot

Phase (Initial Delay)

Callout Function Pointer

Callout Argument

Timeout Node

These are Application Timers that will issue a callback when expiring. In addition to a callout function, an Application Timer receives an initial phase delay and a period and can choose to run once (one-shot) or auto-reload itself.

The callback runs within a System Task with priority 0 and is non-preemptible, which makes the scheduler prioritise it over other tasks. Callouts must be short and unblocking, as they can cause high CPU contention.

For clarity, Timer Callouts are on a separate list in the kernel, although they share the same TIMEOUT node.

2.3. System Tick

A dedicated peripheral that generates an interrupt after a defined period provides the kernel time reference. For ARMv6/7M, this peripheral is the built-in SysTick, a 24-bit counter timer. The handler performs some housekeeping on every tick and assesses the need to call a context switch.

The "housekeeping" accounts for global timer tracking and any tick-dependent condition that might change a task status. When a timer expires, it might switch a task from WAITING to READY or dispatch a callback. In the case of a callback, this will also trigger a context-switching for the TimerHandler System Task in which the callback is executed and the related timer(s) are appropriately updated.

Note that tasks might switch from WAITING to READY for reasons other than tick-related. In these cases, context switching might be triggered immediately if the readied task can preempt the running task.

2.4. Memory Allocator

Memory Allocator Control Block

Associated Block Pool

Number of Blocks

Block Size

Number of Free Blocks

Free Block List

Remember that the standard malloc() leads to fragmentation and (also, because of that) is highly indeterministic. Unless we use it once - to allocate memory before starting up, it doesn’t fit. But often, we need to 'multiplex' memory amongst tasks over time, that is, to dynamically allocate and deallocate.

To avoid fragmentation, we use fixed-size memory blocks. A simple approach would be a static table marking each block as free or taken. With this pattern, you will need to 'search' for the next available block, if any - the time for searching changes - bounding this search to a maximum number of blocks, or O(n). To optimise, an approach is to keep track of what is free using a dynamic table—a linked list of addresses. Now we have O(1).

We use "meta-data" to initialise the linked list. Every address holds the "next" address value. All addresses are within the range of a pool of fixed-size blocks. This approach limits the minimal size of a block to the size of a memory address—32 bits for our supported architecture.

Yet, this is the cheapest way to store meta-data. If not stored on the empty address itself, an extra 32-bit variable would be needed for each block, so it could have a size of less than 32 bits.

Allocating memory at runtime is a major source of latency (1), indeterministic (2) behaviour, and footprint overhead (3).

Design choice: the allocator’s design achieves low-cost, deterministic, fragmentation-free memory management by using fixed-size word-aligned block sizes (1)(2) and embedding metadata within the memory blocks themselves (3).

Benefits: Run-time memory allocation benefits have no real-time drawbacks.

Importantly, the kernel will always round up the block size to the next multiple of 4. Say the user creates a memory pool, assigning blocks to be 6-byte wide; they will turn into 8-byte blocks.

2.4.1. How it works

When a routine calls alloc(), the address to be returned is the one a "free list" is pointing to, say addr1. Before returning addr1 to the caller, we update the free list to point to the value stored within addr1 - say addr8 at that moment.

When a routine calls free(addr1), we overwrite whatever has been written in addr1 with the value-free list point to (if no more alloc() were issued, it would still be addr8), and addr1 becomes the free list head again.

Allocating and deallocating fixed-size blocks using this structure and storing meta-data this way is as deterministic (O(1)) and economical as we can get for dynamic memory allocation.

A drawback is if a routine writes to non-allocated memory within a pool it will spoil the meta-data and the Allocator will fail.

3. Inter-Task Communication

Inter-Task Communication (ITC) refers to the mechanisms that enable tasks to coordinate/cooperate/synchronise by means of sending or receiving information that falls into two logical categories: Signals or Messages.

  • Signals: A signal is solely defined by its absence or presence. The meaning is implicit.

  • Messages: When the operations used for tasks to communicate also allow conveying a payload, these mechanisms are regarded as Message Passing.

3.1. Sleep-Wake Events (Sleep Queues)

Event Control Block

Sleeping Queue

Timeout Node

The simplest mechanism to handle events are the methods sleep(), wake() and signal() acting on an EVENT kernel object — a Sleep Queue.

Sleep Queues do not latch signals. Thus, the operation sleep(&event, timeout) always put the caller task to sleep (except if using RK_NO_WAIT as a timeout, the call has no effect).

A signal(&event) will wake-up a single task - the highest priority. A wake(&event, n, &r) is a broadcast: at most n sleeping tasks will switch to READY. r will store the number of remaining tasks, if any.

If willing to wake all tasks, one either make n=0, or use the flush(&event) helper.

Finally, a query() operation returns the number of sleeping tasks.

Every blocking (waiting) call issued with the option RK_NO_WAIT will be a try operation - if a condition is not satisfied it immediately returns a value indicating an unsuccesful operation, and the caller is not blocked.

3.2. Direct Signals (Flags)

Within Task Control Block

Current Flags

Required Flags

Options

This primitive does not mimic POSIX or UNIX/BSD Signals.

Each Task Control Block stores event notifications other tasks will raise. Often we define that a 32-bit Signal carries 32 signal/event flags — it can represent a combination of 32 different events, if defining 1 event/bit. A bit set means an event is pending to be detected. A detected event is always consumed, that is, the bit is cleared.

Bitwise friendly, the API is written as set() (as to signal/post), get() (as to wait/pend).

A task checks for a combination of events it is expecting. This combination can be satisfied if ANY (OR logic) of the required bits are set or if ALL of the required bits are set (AND logic).

Thus, if the condition is not met the task can optionally suspends, switching to the logical state PENDING.

When another task issues a set() which result satisfies the waiting condition, the task state is then READY. The matched flags are consumed (cleared). A set is always an OR operation of an input mask over the current value. 0x00 is invalid for both set() and get() operations.

Additional operations are to query a tasks’s event register, and to clear its own registers.

3.2.1. Usage Example: Supervisor Task and Asynchronous Signals

One possible usage pattern is a task’s cycle begins checking for any events (it is able/supposed to handle). If using it on a supervisor task — it can create a neat event-driven pattern for a soft/firm real-time system:

VOID SupervisorTask(VOID *args)
{
    RK_UNUSEARGS

    ULONG gotFlags = 0UL;

    while(1)
    {
        /*  range: 0x01-0xFFFF, any bit. store in gotFlags. do not block.*/
        RK_ERR err = kSignalGet(0xFFFF, RK_FLAGS_ANY, &gotFlags, RK_NO_WAIT);
        if (err == RK_SUCCESS)
        {

            if (gotFlags & PENDING_AIRFLOW_INCREASE):
            {    /* notify actuator's task with the proper signal */
                   kSignalSet(airFlowTaskHandle, AIRFLOW_INCREASE_SIGNAL);
            }
                /* others... */


        }

        kSleepUntil(SUPERVISOR_T_PERIOD);
    }

}

Task Signals are the the only ITC primitive that cannot be disabled, thus, they are regarded as a Core Mechanism.

3.3. Semaphores

Semaphore Control Block

Counter (Unsigned Integer)

Semaphore Type (Counter/Binary)

Waiting Queue

Timeout Node

A semaphore S is a nonnegative integer variable, apart from the operations it is subjected to. S is initialized to a nonnegative value. The two operations, called P and V, are defined as follows:

P(S): if S > 0 then S := S-1, else the process is suspended until S > 0.

V(S): if there are processes waiting, then one of them is resumed; else S := S+1.

(Dijkstra, 1968)

Semaphores are public kernel objects for signalling and waiting for events.

The primitives post() is the V(S), and pend() is the P(S) as described above.

When pend() is issued on a semaphore which counter is 0, the caller (optionally) switches to a BLOCKED state, and is enqueued within the semaphore queue.

After that, every post() issued to a semaphore releases a single task, ordered by priority, until there is no more blocked tasks within the semaphore.

Then, the internal counter will increase above 0, only and if only there are no tasks on the waiting queue when the semaphore is signalled.

3.3.1. Counting and Binary Semaphores

Events (Sleep Queues) do not latch events, Direct Signals do not accumulate; Counting Semaphores are event counters. They can count up to RK_SEMA_MAX_VALUE (default is 255).

The typical use case for Counting Semaphores is as a "credit tracker" — one uses it to verify (wait/pend) and indicate (signal/post) the availability of a countable resource — say, number of slots within a queue.

A Binary Semaphore is a specialisation: it counts up to 1 and down to 0 — they do not accumulate. We often say its either FULL or EMPTY. The typical use case is for task-to-task (unilateral or bi-lateral), or ISR-to-task (unilateral) synchronisation.

In this sense, they overlap the Direct Signals mechanism, that can be seen as a pack of private binary semaphores (only the task itself can pend but any task can post). They can also be used as Locks for mutual-exclusion, but it has drawbacks as will be explained later.

Semaphores can also broadcast a signal, on the same way as described for EVENTS, either by wake(n) or flush().

Finally, a query() operation on a semaphore will return the number of waiting tasks if any, or the counter value. To differentiate, the number of waiting tasks is returned as a negative value. A nonnegative value is the semaphore’s counter value.

Notes:

  1. The post and pend operations are aliased to signal() and wait() respectively, to satisfy those who prefer this nomenclature.

  2. If Binary Semaphore is initialised with a value > 1, the effective value is 1.

3.3.2. Usage Example: Task-to-Task Bilateral Synchronisation

The snippet below shows two tasks lock-stepping by posting and pending on (binary) semaphores. Task2 depends on Task1 finishing 'work1' to perform 'work2'. And vice-versa.

(Note Direct Signals are a better choice for this use-case.)

RK_SEMA work1Sema;
RK_SEMA work2Sema;

VOID kApplicationInit(VOID)
{
/* semaphores init at 0 */
	kSemaInit(&work1Sema, RK_SEMA_BIN, 0);
	kSemaInit(&work2Sema, RK_SEMA_BIN, 0);

}

VOID Task1(VOID* args)
{
    RK_UNUSEARGS
	while (1)
	{
	    doWork1();
		kSemaPost(&work1Sema);
		kSemaPend(&work2Sema, RK_WAIT_FOREVER);
		 /* T1 finished. Waiting for T2. */

	}
}

VOID Task2(VOID* args)
{
    RK_UNUSEARGS
	while (1)
	{
		 kSemaPend(&work1Sema, RK_WAIT_FOREVER);
		 doWork2();
		 kSemaPost(&work2Sema);
	}
}

3.3.3. Mutex Semaphores (Locks)

Mutex Control Block

Locked State (Boolean)

Owner

Waiting Queue

Timeout Node

Mutex Node (list node within the owner TCB)

Some code regions are critical in that they cannot be accessed by more than one task at once. Acquiring (lock()) a mutex before entering a region and releasing it when leaving makes that region mutually exclusive.

A Mutex is another semaphore specialisation — it can be seen as a binary semaphore with a notion of ownership - when a task susccesfully acquires a mutex is now the owner, and only this task can release it.

If a task tries to acquire an already locked mutex, it switches to BLOCKED state until the mutex is unlocked by its owner. Then, the highest priority task waiting to acquire the resource is dequeued, as on semaphores.

However, unlike semaphores, the complementary operation, unlock(), when issued by a non-owner, has undefined behaviour. In K0, it will be a hard fault.

Mutexes are solely for mutual exclusion; they cannot be used for signalling. It is common to use Counting Semaphores initialised as 1, or Binary Semaphores for mutual exclusion.

However, particularly for a Counting Semaphore, if the count increases twice in a row, the mutual exclusion is gone. For both, Priority Inversion can become a problem, as will be explained.

PS: Mutexes in RK0 are not recursive. One cannot make reentrant calls on critical regions.

3.3.4. Usage Example: Bounded Buffer

The snippet below shows a consumer-producer pattern for a buffer with K slots (bounded buffer pattern). Two semaphores track the number of slots for the producer and items for the consumer. The mutex prevents any write or read from being disrupted.

RK_SEMA  item;
RK_SEMA  space;
RK_MUTEX lock;
#define N (K)
typedef struct mesg
{
 UINT field1;
 UINT field2;
 UINT field3;
 UINT field4;
} Mesg_t; /* a 16-byte message */

/* a ring buffer of messages */
Mesg_t mailbox[N]={0};

kApplicationInit(VOID)
{
    kSemaInit(&item,  RK_SEMA_COUNT, 0); /* no items */
    kSemaInit(&space, RK_SEMA_COUNT, N); /* N buffers available */
    kMutexInit(&lock);
}
/* circular buffer handling omitted */

/* wait for space, lock, write, unlock, signal there is item */
VOID PostMail(Mesg_t* sendPtr)
{
    kSemaWait(&space, RK_WAIT_FOREVER);
    kMutexLock(&lock,  RK_WAIT_FOREVER);
    memcpy(&mailbox[tail], sendPtr, sizeof(Mesg_t));
    kMutexUnlock(&lock);
    kSemaSignal(&item);
}

/* wait for item, lock, read, unlock, signal there is space */
VOID PendMail(Mesg_t* recvPtr)
{
    kSemaWait(&item, RK_WAIT_FOREVER);
    kMutexLock(&lock,  RK_WAIT_FOREVER);
    memcpy(recvPtr, &mailbox[head], sizeof(Mesg_t));
    kMutexUnlock(&lock);
    kSemaSignal(&space);
}

3.3.5. Priority Inversion and the Priority Inheritance Protocol

Let TH, TM, and TL be three tasks with priority high (H), medium (M) and low (L), respectively. Say TH is dispatched and blocks on a mutex that 'TL' has acquired (i.e.: "TL is blocking TH").

If 'TM' does not need the resource, it will run and preempt 'TL'. And, by transition, 'TH'.

From now on, 'TH' has an unbounded waiting time because any task with priority higher than 'L' that does not need the resource indirectly prevents it from being unblocked — awful.

The Priority Inheritance (PI) Protocol avoids this unbounded waiting. It is characterised by an invariant, simply put:

At any instant a Task assumes the highest priority amongst the tasks it is blocking.

If employed on the situation described above, task TM cannot preempt TL, whose effective priority would have been raised to 'H'.

It is straightforward to reason about this when you consider the scenario of a single mutex.

But when locks nest — that is, more than one critical region — the protocol also needs to be:

  • Transitive: that is, if T1 is blocking T2, and T2 is blocking T3, if T3 has the highest priority, T3 propagates its priority to T1 via T2.

  • A task can own several mutexes at once. Thus, when exiting the critical region it needs to look up each waiting queue, and assume the highest priority. If there are no blocked tasks behind, its nominal priority is then restored. (As tasks are enqueued by priority, it means looking at the task waiting on the head of each waiting queue.)

Below, a demonstration:

/* Task1 has the Highest nominal priority */
/* Task2 has the Medium nominal priority */
/* Task3 has Lowest nominal priority */

/* Note Task3 starts as 1 and 2 are delayed */

RK_DECLARE_TASK(task1Handle, Task1, stack1, STACKSIZE)
RK_DECLARE_TASK(task2Handle, Task2, stack2, STACKSIZE)
RK_DECLARE_TASK(task3Handle, Task3, stack3, STACKSIZE)


RK_MUTEX mutexA;
RK_MUTEX mutexB;

VOID kApplicationInit(VOID)
{
	kassert(!kCreateTask(&task1Handle, Task1, RK_NO_ARGS, "Task1", stack1, \
		STACKSIZE, 1, RK_PREEMPT));
	kassert(!kCreateTask(&task2Handle, Task2, RK_NO_ARGS, "Task2", stack2, \
		STACKSIZE, 2, RK_PREEMPT));
	kassert(!kCreateTask(&task3Handle, Task3, RK_NO_ARGS, "Task3", stack3, \
		STACKSIZE, 3, RK_PREEMPT));

/* mutexes initialised with priority inheritance enabled */
	kMutexInit(&mutexA, RK_INHERIT);
	kMutexInit(&mutexB, RK_INHERIT);
}



VOID Task3(VOID *args)
{
	RK_UNUSEARGS
	while (1)
	{
		printf("@ %lums: [TL] Attempting to LOCK 'A' | Eff: %d | Nom: %d\r\n", kTickGet(),
			   runPtr->priority, runPtr->prioReal);

		kMutexLock(&mutexA, RK_WAIT_FOREVER);

		printf("@ %lums: [TL] LOCKED 'A' (in CS) | Eff: %d | Nom: %d\r\n", kTickGet(),
			   runPtr->priority, runPtr->prioReal);

		kBusyWait(60); /* <-- important */

		printf("@%lums: [TL] About to UNLOCK 'A' | Eff: %d | Nom: %d\r\n", kTickGet(),
			   runPtr->priority, runPtr->prioReal);

		kMutexUnlock(&mutexA);

		printf("--->");
		printf("@%lums: [TL] Exit CS | Eff: %d | Nom: %d\r\n", kTickGet(),
			   runPtr->priority, runPtr->prioReal);

		kSleep(4);
	}
}

VOID Task2(VOID *args)
{
	RK_UNUSEARGS
	while (1)
	{
		kSleep(5);

		printf("@%lums: [TM] Attempting to LOCK 'B' | Eff: %d | Nom: %d\r\n", kTickGet(),
			   runPtr->priority, runPtr->prioReal);
		kMutexLock(&mutexB, RK_WAIT_FOREVER);

		printf("@%lums: [TM] LOCKED 'B', now trying to LOCK 'A' | Eff: %d | Nom: %d\r\n",
			   kTickGet(), runPtr->priority, runPtr->prioReal);
		kMutexLock(&mutexA, RK_WAIT_FOREVER);

		printf("@%lums: [TM] LOCKED 'A' (in CS) | Eff: %d | Nom: %d\r\n", kTickGet(),
			   runPtr->priority, runPtr->prioReal);
		kMutexUnlock(&mutexA);

		printf("@%lums: [TM] UNLOCKING 'B' | Eff: %d | Nom: %d\r\n", kTickGet(),
			   runPtr->priority, runPtr->prioReal);

		kMutexUnlock(&mutexB);

		printf("--->");

		printf("@%lums: [TM] Exit CS | Eff: %d | Nom: %d\r\n", kTickGet(),
			   runPtr->priority, runPtr->prioReal);
	}
}

VOID Task1(VOID *args)
{
	RK_UNUSEARGS
	while (1)
	{
		kSleep(2);

		printf("@%lums: [TH] Attempting to LOCK 'B'| Eff: %d | Nom: %d\r\n", kTickGet(),
			   runPtr->priority, runPtr->prioReal);

		kMutexLock(&mutexB, RK_WAIT_FOREVER);

		printf("@%lums: [TH] LOCKED 'B' (in CS)  | Eff: %d | Nom: %d\r\n", kTickGet(),
			   runPtr->priority, runPtr->prioReal);

		kMutexUnlock(&mutexB);

		printf("--->");

		printf("@%lums: [TH] Exit CS | Eff: %d | Nom: %d\r\n", kTickGet(),
			   runPtr->priority, runPtr->prioReal);
	}
}

Result and comments:

>>>> TL locks 'A'. Higher priority tasks are sleeping. <<<<

@ 14720ms: [TL] Attempting to LOCK 'A' | Eff: 3 | Nom: 3
@ 14720ms: [TL] LOCKED 'A' (in CS) | Eff: 3 | Nom: 3

@14721ms: [TM] Attempting to LOCK 'B' | Eff: 2 | Nom: 2

>>>> TM acquires 'B' and is blocked by TL on 'A'. TL inherits TM's  priority. <<<<

@14721ms: [TM] LOCKED 'B', now trying to LOCK 'A' | Eff: 2 | Nom: 2

>>>> TH will blocked by TM on 'B': <<<<

@14722ms: [TH] Attempting to LOCK 'B'| Eff: 1 | Nom: 1

>>>> TM inherits TH's priority. TL inherits TH's priority via TM. <<<<

@14780ms: [TL] About to UNLOCK 'A' | Eff: 1 | Nom: 3

>>>> Upon unlocking 'A', TL is preempted by TM. It means TL's priority has been restored, as it is no longer blocking a higher priority task. <<<<


>>>> Now TM acquires 'A' <<<<

@14780ms: [TM] LOCKED 'A' (in CS) | Eff: 1 | Nom: 2

>>>> After releasing 'A', but before releasing 'B', TM's priority is still '1', as it is blocking TH while holding 'B'. <<<<

@14780ms: [TM] UNLOCKING 'B' | Eff: 1 | Nom: 2

>>>> Upon unlocking 'B' TM is preempted by TH. (TM's priority has been restored.) <<<<

@14780ms: [TH] LOCKED 'B' (in CS)  | Eff: 1 | Nom: 1

>>> RESULT: even though priority inversion was enforced, tasks leave the nested lock ordered by their nominal priority. <<<

--->@14780ms: [TH] Exit CS | Eff: 1 | Nom: 1
--->@14780ms: [TM] Exit CS | Eff: 2 | Nom: 2
--->@14780ms: [TL] Exit CS | Eff: 3 | Nom: 3

Importantly, the worst-case time is bounded by the time the lowest priority task holds a lock (60 ms in the example: 14720ms → 14780ms).

As for each priority update we check each waiting queue for each mutex a task owns, the time-complexity is is linear owner*mutex. But, typically no task ever holds more than a few mutexes. Yet, one should not be encouraged to nest locks if not needed.

Mutexes vs Binary Semaphores

There is (or used to be) a lot of fuss about whether binary semaphores are appropriate to use as locks. As a practical guideline, if all tasks sharing the resource have the same priority, using a binary semaphore can be appropriate — because a binary semaphore is considerably faster. It all depends on the case.

The drawback is the lack of ownership: any task can accidentally release the resource. On a large codebase, this can become a real problem. Nonetheless, this is a problem for semaphores in general.

For tasks with different priorities, binary semaphores should never be considered for mutual exclusion unless priority inversion is not a problem (how?).

Counting semaphores initialised as 1 is too risky. Besides the priority inversion, if the count ever increases above 1, mutual exclusion is lost, and multiple tasks can enter the critical section at once.

3.4. Scheduler Lock

Often, we need a task to perform operations without being preempted. A mutex serialises access to a code region but does not prevent a task from being preempted while operating on data. Depending on the case, this can lead to inconsistent data state.

An aggressive way is to disable interrupts globally. For kernel services often it is the only way to keep data integrity. On the higher level it is feasible for very short operations and/or when you need to protect data from interrupts altogether.

A less aggressive approach is to make the task non-preemptible with kSchLock() before entering the critical region and kSchUnlock() when leaving. This way, interrupts are still being sensed, and even higher-priority tasks might switch to a ready state, but the running thread will not be preempted.

The priority inversion it potentially causes is bounded. If a higher-priority task is readied while the scheduler is locked, the context switch happens immediately after unlocking.

Note that for locking/unlocking the scheduler the global interrupts will be disabled for the time to increment/decrement a counter.


To add to the discussion, when two threads need to access the same data to 'read-modify-write', a lock-free mechanism is the LDREX/STREX operations of ARMv7M (or more generally C11 atomics). They do not avoid preemptions, and particularly in ARMv7m, if the data is touched by an ISR before the store-exclusive concludes, the ownership is lost. Typically used for multi-core spin-locking.


3.5. Condition Variables

Note that a condition "variable" is neither true nor false; indeed, it does not have any stored value accessible to the program. In practice, a condition variable will be represented by an (initially empty) queue of processes which are currently waiting on the condition.

(Hoare, 1980)

The EVENT object is (structurally) a Condition Variable as described by Hoare. In RK0, you can use them along with Mutexes, following the same semantics for Pthreads Condition Variables:

  1. Whenever a task needs to test for a condition (a predicate) before proceeding, a mutex is locked to test the condition within a critical region. If the condition evaluates true, it proceeds and unlocks the mutex at the end.

  2. If the condition is evaluated as false, the task unlocks the mutex (so the region can be accessed by another task) and goes to sleep for a condition; it must be done in an atomic operation.

  3. When the condition is satisfied another task will signal (or broadcast) the sleeping task(s). Whether testing or not the condition before locking the mutex when waking depends on the case.

One possible pattern is the Mesa Monitor, the condition is checked on a while loop:

/* Mesa Monitor Pattern for a Condition Variable */

RK_EVENT condVar;
RK_MUTEX guardMutex;

/* entering the critical region */

lock(&guardMutex);

while (!condition)
{
    /* disable preemption */
    schlock();
    unlock(&guardMutex);
    sleep(&condVar);
    /* enable preemption */
    schunlock();

    /* when signalled, wake here, lock the mutex, test again */
    lock(&guardMutex);
}
/* proceed */

 ....

/* exit critical region */
unlock(&guardMutex);

return;

The API provides helpers: kCondVarWait(&event, &mutex, timeout), kCondVarSignal(&event) and kCondVarBroadcast(&event). The kCondVarWait is the really helpful one as it provides the atomicity needed. If not atomic, the unlock() might cause a context switch before the running task can goes to sleep, and nothing works.

broadcast() and signal() are just aliases for flush and signal on a EVENT.

3.5.1. Usage Example: Synchronisation Barrier

A given number of tasks must reach a point in the program before all can proceed, so every task calls a barrWait(&barrier, numberOfTasks) to catch up with the set of tasks it must synchronise.

The last task entering the barrier will broadcast a signal to all tasks waiting for the wake condition.

Note that the mutex enforces a single active task within the barrier. They enter and leave on a 'turnstile'.

/* Synchronisation Barrier */

typedef struct
{
    RK_MUTEX lock;
    RK_EVENT event;
    UINT count;        /* number of tasks in the barrier */
    UINT round;        /* increased every time all tasks synch     */
} Barrier_t;

VOID BarrierInit(Barrier_t *const barPtr)
{
    kMutexInit(&barPtr->lock, RK_INHERIT);
    kEventInit(&barPtr->event);
    barPtr->count = 0;
    barPtr->round = 0;
}

VOID BarrierWait(Barrier_t *const barPtr, UINT const nTasks)
{
    UINT myRound = 0;
    kMutexLock(&barPtr->lock, RK_WAIT_FOREVER);

    /* save round number */
    myRound = barPtr->round;
    /* increase count on this round */
    barPtr->count++;

    if (barPtr->count == nTasks)
    {
        /* reset counter, inc round, broadcast to sleeping tasks */
        barPtr->round++;
        barPtr->count = 0;
        kCondVarBroadcast(&barPtr->event);
    }
    else
    {
        /* a proper wake signal might happen after inc round */
        while ((UINT)(barPtr->round - myRound) == 0U)
        {
            kCondVarWait(&barPtr->event, &barPtr->lock, RK_WAIT_FOREVER);
        }
    }

    kMutexUnlock(&barPtr->lock);

}


#define N_BARR_TASKS 3

Barrier_t syncBarrier;

VOID kApplicationInit(VOID)
{

    kassert(!kCreateTask(&task1Handle, Task1, RK_NO_ARGS, "Task1", stack1, STACKSIZE, 2, RK_PREEMPT));
    kassert(!kCreateTask(&task2Handle, Task2, RK_NO_ARGS, "Task2", stack2, STACKSIZE, 3, RK_PREEMPT));
    kassert(!kCreateTask(&task3Handle, Task3, RK_NO_ARGS, "Task3", stack3, STACKSIZE, 1, RK_PREEMPT));
	BarrierInit(&syncBarrier);
}
VOID Task1(VOID* args)
{
    RK_UNUSEARGS
    while (1)
    {
        kPuts("Task 1 is waiting at the barrier...\r\n");
        BarrierWait(&syncBarrier, N_BARR_TASKS);
        kPuts("Task 1 passed the barrier!\r\n");
		kSleep(8);

    }
}

VOID Task2(VOID* args)
{
    RK_UNUSEARGS
    while (1)
    {
        kPuts("Task 2 is waiting at the barrier...\r\n");
        BarrierWait(&syncBarrier, N_BARR_TASKS);
        kPuts("Task 2 passed the barrier!\r\n");
		kSleep(5);
	}
}

VOID Task3(VOID* args)
{
    RK_UNUSEARGS
    while (1)
    {
        kPuts("Task 3 is waiting at the barrier...\r\n");
        BarrierWait(&syncBarrier, N_BARR_TASKS);
        kPuts("Task 3 passed the barrier!\r\n");
        kSleep(3);
	}
}

syncbarr

3.5.2. Usage Example: Readers Writers Lock

Several readers and writers share a piece of memory. Readers can concurrently access the memory to read; a single writer is allowed (otherwise, data would be corrupted).

When a writer finishes, it checks for any readers waiting. If there is, the writer flushes the readers waiting queue. If not, it wakes a single writer, if any. When the last reader finishes, it signals a writer.

Every read or write operation begins with an acquire and finishes with a release.

/* RW-Lock */

/* a single writer is allowed if there are no readers */
/* several readers are allowed if there is no writer*/
typedef struct
{
	RK_MUTEX	 lock;
	RK_EVENT	 writersGo;
	RK_EVENT	 readersGo;
	INT			 rwCount; /* number of active readers if > 0 */
						  /* active writer if -1             */
}RwLock_t;

VOID RwLockInit(RwLock_t *const rwLockPtr)
{

	kMutexInit(&rwLockPtr->lock, RK_INHERIT);
	kEventInit(&rwLockPtr->writersGo);
	kEventInit(&rwLockPtr->readersGo);
	rwLockPtr->rwCount = 0;
}

/* A writer can acquire if  rwCount = 0 */
/* An active writer is indicated by rwCount = -1; */
VOID RwLockAcquireWrite(RwLock_t *const rwLockPtr)
{
	kMutexLock(&rwLockPtr->lock, RK_WAIT_FOREVER);
	/* if different than 0, there are either writers or readers */
	/* sleep to be signalled */
	while (rwLockPtr->rwCount != 0)
	{
	    kCondVarWait(&rwLockPtr->writersGo, &rwLockPtr->lock, RK_WAIT_FOREVER);
	    /* mutex is locked when waking up*/
	}
	/* woke here, set an active writer */
	rwLockPtr->rwCount = -1;
	kMutexUnlock(&rwLockPtr->lock);
}

/* a writer releases, waking up all waiting readers, if any */
/* if there are no readers, a writer can get in */
VOID RwLockReleaseWrite(RwLock_t *const rwLockPtr)
{
	kMutexLock(&rwLockPtr->lock, RK_WAIT_FOREVER);

	rwLockPtr->rwCount = 0; /* indicate no writers*/

	/* if there are waiting readers, flush */
	ULONG nWaitingReaders=0;
	kEventQuery(&rwLockPtr->readersGo, &nWaitingReaders);
	if (nWaitingReaders > 0)
	{
	    /* condVarBroadcast is just an alias for an event flush */
		kEventFlush(&rwLockPtr->readersGo);
	}
	else
	{
		/* wake up a single writer if any */
		kEventSignal(&rwLockPtr->writersGo);
	}
	kMutexUnlock(&rwLockPtr->lock);
}

/* a reader can acquire if there are no writers */
VOID RwLockAcquireRead(RwLock_t *const rwLockPtr)
{
	kMutexLock(&rwLockPtr->lock, RK_WAIT_FOREVER);
	/* if there is an active writer, sleep */
	while (rwLockPtr->rwCount < 0)
	{
	    kCondVarWait(&rwLockPtr->readersGo, &rwLockPtr->lock, RK_WAIT_FOREVER);
	    /* mutex is locked when waking up*/
	}
	/* increase rwCount, so its > 0, indicating readers */
	rwLockPtr->rwCount ++;
	kMutexUnlock(&rwLockPtr->lock);
}

/* a reader releases and wakes a single writer */
/* if it is the last reader */
VOID RwLockReleaseRead(RwLock_t *const rwLockPtr)
{
	kMutexLock(&rwLockPtr->lock, RK_WAIT_FOREVER);
	rwLockPtr->rwCount --;
	if (rwLockPtr->rwCount == 0)
	{
		kEventSignal(&rwLockPtr->writersGo);
	}
	kMutexUnlock(&rwLockPtr->lock);
}

In the image below, 4 tasks — a fast writer (Task 1), a slow writer (Task 4) and two readers (Task3 is faster than Task2) — reading from and writing to a shared UINT variable:

readerwriter 4

Message Passing

In real-time applications, Message Passing often encounters the following scenarios:

  • Some messages are consumed by tasks that can’t do anything before processing information — thus, these messages end up also being signals. For Example, a server needs (so it blocks) a command to process and/or a client that blocks for an answer.

  • A particular case of the above scenario is fully synchronous: client and server run on lockstep.

  • Two tasks with different rates need to communicate, and cannot lockstep. A faster producer might use a buffer to accommodate a relatively small burst of generated data, or a quicker consumer will drop repeated received data.

  • Other times, we need to correlate data with time for processing, so using a queue gives us the idea of data motion. Eg., when calculating the mean value of a transductor on a given period.

  • Past data is useless for real-time tasks such as servo-control loops. Consumers need the most recent data for processing. For example, a drive-by-wire system or a robot deviating from obstacles. In these cases, message passing must be lock-free while guaranteeing data integrity.

  • The message-passing mechanisms do not reuse any of the mechanisms presented so far.

  • The same try semantics applies when using the RK_NO_WAIT timeout option.

3.6. Mailbox

Mailbox Control Block

Mail Address

Waiting queue

Owner Task*

In GPOS jargon, mailboxes are queues of messages—a distinction from pipes (which are byte streams)—but in embedded system software, mailboxes are often said to have a capacity of a single item. More recently, you will not find it as a distinct mechanism—you use a 1-item queue.

A Mailbox allows a task to exclusively write (post) and read (pend) a memory region and to be notified when another task writes or reads to it. Therefore, its typical operation provides mutual exclusion and notification: very handy.

A message within a mailbox is the address of an object. The sender and receiver agree on the concrete mail implementation as part of the mail interface contract; also,, the data has to remain unchanged until the receiver 'consumes' it. That is another part of the contract.

The semantics are simple: a Mailbox will be EMPTY when its storage points to NULL; otherwise, it is FULL. The mailbox will be empty/full after a successful pend()/post() operation.

When a producer post() to a FULL mailbox, it (optionally) blocks and is placed in the Mailbox waiting queue. The associated task will switch to the state SENDING.

Likewise, a consumer (optionally) blocks when issuing a pend() on an empty Mailbox. The task status switches to RECEIVING, and is enqueued in the mailbox waiting queue.

A mailbox can be initialised as FULL if the initial pointer provided is non-null.

Mailboxes are well-suited for 1:1 communication - fully synchronous (lockstep) command-response or when a task waits for a notification plus a payload (say, the last data read by an Interrupt routine). A typical use case is when one wants to deliver a signal along with a payload—a message as a signal.

Besides post() and pend(), other primitives are peek() to read without removing (non-destructive) and postovw() to overwrite whatever is in a full mailbox.

* we will discuss ownership on message passing later.

Passing Messages by reference is a typical “embedded thing” – because it is cheap, deterministic and DMA-friendly.

3.6.1. Usage Example: Zero-Buffer Channel

Some communications are unreliable or important enough so we need guarantees that not only the message could be sent, but also that it could be read.

On a zero-buffer channel, we do not allow messages to be waiting, so they are picked. The sender blocks, waiting for a confirmation that the receiver retrieved the message:

/* sender needs to be sure message has arrived */
SenderTask:

   err = kMboxPost(...., timeout);
   if (err==ERR_TIMEOUT)
       retryPost();
   if (err==success)
   {
       /* pend on private bin semaphore, to wait for confirmation it was read */
       err = kPend(timeout);
       if (err == ERR_TIMEOUT)
       /* receiver did not ack before time-out */

   }


ReceiverTask:
    err = kMboxPend( ..., timeout);
    if (err==ERR_TIMEOUT)
        retryPend();
    if(err==SUCCESS)
       /* post to sender's semaphore, to ack message was received */
       kSignal(senderTaskHandle);


/* using a mailbox instead of a binary semaphore */


RK_MBOX reqBox; /* request message */
RK_MBOX ackBox; /* ack message */

SenderTask:

   err = kMboxPost(&reqBox, &reqMesg, timeout);
   if (err==ERR_TIMEOUT)
       retryPost();
   if (err==SUCCESS)
   {
       /* the acknowledgement mail can be a dummy message */

       err = kMboxPend(&ackBox, &recvAck, timeout);
       if (err == ERR_TIMEOUT)
       /* receiver did not ack before time-out */
        /* retryPost ? retryAck? Fail gracefully? */
   }


ReceiverTask:
    err = kMboxPend( ..., timeout);
    if (err==ERR_TIMEOUT)
        retryPend();
    if(err==SUCCESS)
    {
       err = kMboxPost(&ackBox, &ackMesg, timeout);
       if (err==ERR_TIMEOUT)
            retryAck();
    }

3.6.2. Example: Multi-client-server synchronous command-response

The snippet below presents two clients and one server on a lock-step communication.

It shows how data scope is kept and how it can be lost. In this case, client and server blocking for a response/ACK keeps the data scope.

/* This example includes  <string.h> for convenience */

RK_MBOX serverReqMbox; /*  server incoming commands */
RK_MBOX serverAckMbox; /*  server incoming reponse acks */
RK_MBOX clientMbox1;   /*  response for client 1 */
RK_MBOX clientMbox2;   /* response for client 2 */

/* Command Requests are assembled on an Application Data Unit */
typedef struct
{
    BYTE length; /* Length of the APDU payload */
    BYTE payload[32]; /* APDU payload */
    RK_MBOX *replyMbox; /* Pointer to the client's reply mailbox */
} APDU __K_ALIGN(4);

void kApplicationInit(VOID)
{
    kMboxInit(&serverReqMbox,  NULL);
    kMboxInit(&serverAckMbox, NULL);
    kMboxInit(&clientMbox1, NULL);
    kMboxInit(&clientMbox2, NULL);

}

/* Highest Priority */
/* the server response is to ECHO the request back to the client; then, it pends on a mailbox waiting for the client to acknowledge the response. So it proceeds to process further requests.  */

VOID ServerTask(VOID* args)
{
    RK_UNUSEARGS

    APDU *request, response;
    UINT* ackResp;
    while (1)
    {
        /* Wait for a request */
        if (kMboxPend(&serverReqMbox, (VOID **)&request, RK_WAIT_FOREVER) == RK_SUCCESS)
        {
            kprintf("[SERVER] RECV: %s\r\n", request->payload);

            /* Process the request */
            response.length = (BYTE) snprintf((char*) response.payload,
                    sizeof(response.payload), "ECHO %s",
                    request->payload);

            /* Echo to client's reply mailbox */
            if (kMboxPost(request->replyMbox, &response, RK_WAIT_FOREVER) != RK_SUCCESS)
            {
                kprintf("ECHO fail\r\n");
            }
            if (kMboxPend(&serverAckMbox, (VOID **)&ackResp, RK_WAIT_FOREVER) == RK_SUCCESS)
                kprintf("[SERVER] CLIENT %d SERVED.\r\n", *ackResp);
            /* now it is safe to process another request */
        }
    }
}
/* same priority as Client2 */
VOID Client1Task(VOID* args)
{
    RK_UNUSEARGS

    APDU request, *response;

    while (1)
    {
        /* Prepare the request */
        snprintf((char*) request.payload, sizeof(request.payload),
                "Hello from Client 1");
        request.length = (BYTE) strlen((char*) request.payload);
        request.replyMbox = &clientMbox1; /* Specify the reply mailbox */

        /* Send the request to the server */
        if (kMboxPost(&serverReqMbox, &request, RK_WAIT_FOREVER) == RK_SUCCESS)
        {

            /* Wait for the response */
            if (kMboxPend(&clientMbox1, (VOID **)&response, RK_WAIT_FOREVER)
                    == RK_SUCCESS)
            {
                kprintf("[CLIENT #1] RECV: %s\r\n", response->payload);
                UINT ack=1;
                kMboxPost(&serverAckMbox, &ack, RK_WAIT_FOREVER);
                /* now it is safe to send another request */
            }
            else
            {
                kprintf("1F\r\n");
            }
        }
        else
        {
            kprintf("1F\r\n");
        }

    }
}
VOID Client2Task(VOID* args)
{
    RK_UNUSEARGS
    APDU request, *response;

    while (1)
    {
        /* Prepare the request */
        snprintf((char*) request.payload, sizeof(request.payload),
                "Hello from Client 2");
        request.length = (BYTE) strlen((char*) request.payload);
        request.replyMbox = &clientMbox2; /* Specify the reply mailbox */

        /* Send the request to the server */
        if (kMboxPost(&serverReqMbox, &request, RK_WAIT_FOREVER) == RK_SUCCESS)
        {

            /* Wait for the response */
            if (kMboxPend(&clientMbox2, (VOID **)&response, RK_WAIT_FOREVER)
                    == RK_SUCCESS)
            {
                kprintf("[CLIENT #2] RECV: %s\r\n", response->payload);
                UINT ack=2;
                kMboxPost(&serverAckMbox, &ack, RK_WAIT_FOREVER);
            }
            else
            {
                kprintf("2FAIL\r\n");
            }
        }
        else
        {
            kprintf("2FAIL\r\n");
        }

    }
}
images\clientserver

Had the server not blocked waiting for an ACK, the former response would have been overwritten before a client could have read it, given how priorities are set. To accommodate two clients while still passing by reference, the server would need to keep the response on different buffers.

If a copy were passed as a response, the server would not need to block for an ACK, provided the response was sent before receiving another request.

3.7. Signals as a Direct Channel

Direct Signals is the only Inter-Task Communication service that is always enabled. It can also be used for message passing—and no, it is not a "hack."

One can use either a first message (do not overwrite a current message) or last message (overwrite) semantics.

A possible pattern is as follows — in this case an unbuffered direct channel:

/* first-message semantics */
RK_ERR SendMesg(RK_TASK_HANDLE const taskHandle, ULONG const mesg)
{
    /* return 1 if there is a message */
    /* for the last message semantics, skip this check */
    ULONG query=0;
    RK_ERR err = kSignalQuery(taskHandle, &query);
    if (err < 0)
        return (err);
    if (query != 0) /* receiver is 'full' */
        return (1);
    err = kSignalSet(taskHandle, mesg);
    return (err);
}


RK_ERR RecvMesg(ULONG *const recvPtr, RK_TICK const timeout)
{
    /* require all bits set; wait for any when returning
    all bits are cleared */
    RK_ERR err = kSignalGet(0xFFFFFFFF, RK_FLAGS_ANY, recvPtr, timeout);
    return (err);
}

/*****

Note that ZERO is not a valid parameter for a Signal.

You can establish a contract such as splitting the 32-bit message on different fields; besides, you can always pass a pointer:

*****/


/* EXAMPLE: fully synchronous message-passing */

struct mesg
{
    RK_TASK_HANDLE senderHandle;
    CHAR   mesg[8];

    /* others fields */

} __K_ALIGN(4);

typedef struct mesg Mesg_t;

#define ACK 0x01U

VOID RecvTask( VOID *args)
{
    RK_UNUSE_ARGS

    ULONG recvMesg = 0;

    while(1)
    {
        /* pend for receiving */
        RecvMesg(&recvMesg, RK_WAIT_FOREVER);

        /* cast 4-byte received to Mesg_t* */
        Mesg_t* recvMesgPtr = (Mesg_t*) recvMesg;

        /* ... work(recvMesgPtr) ... */

        /* ack reception */
        kSignalSet(recvMesgPtr->senderHandle, ACK);

    }
}


VOID SenderTask( VOID *args)
{
    RK_UNUSE_ARGS

    Mesg_t sendMesg = {0};

    while(1)
    {
        /* ... generate message... */


        /* pass the address of sendMesg as a ULONG */
        RK_ERR err = SendMesg(recvTaskHandle, (ULONG)&sendMesg);

        /* block for an ACK */
        kSignalGet(ACK, RK_FLAGS_ALL, NULL, RK_WAIT_FOREVER);

    }

}

3.8. Message Queues

The classic Message Queue on UNIX SVR4 is defined as the 'head of a linked list of messages'. Some RTOSes implement Message Queues using linked lists, in which case a central pool of buffers might exist.

The design approach in RK0 does not use lists for message queues  — lists add processing and memory overhead and are suitable for unbounded queues.

As unbounded is a forbidden word in RK0 design, every message queue has a fixed capacity and a dedicated pool of buffers, avoiding contention.

Two mechanisms for enqueueing messages are offered:

  • A Mail Queue (or a Queue) is a 'multi-item' Mailbox: it holds multiple generic pointers as messages.

  • A Stream Queue (or a Stream) is a ring buffer of N fixed-size messages (word-aligned). Streams perform deep copies - from sender storage to the stream buffer, and from the stream buffer to receiver storage.

They are offered as different mechanisms because they have different best-use cases.

3.8.1. Mail Queue

Queue Control Block

Buffer Address

Write Position

Read Position

Max. number of mails

Current number of mails

Waiting queue

Owner Task

Mail Queues (or just Queues) are Mailboxes that can hold several messages in a FIFO queue. Indeed, a Mail Queue with a size of 1 will behave as a Mailbox.

The programmer must provide a buffer to hold N message addresses for a Queue. The main primitives are post(), pend(), peek(), and jam().

Peek reads the Queue front message without extracting it, and Jam places a message on the queue front so that this message will be Last-In-First-Out.

Mails will be enqueued in a FIFO order (except when using jam()).

A single-slot Queue behaves as a Mailbox. Still, Mailboxes are provided as a distinct service from Queues because a Queue Control Block is roughly three times larger than a Mailbox, plus Queue methods are considerably heavier. As mailboxes are extremely handy, providing them as a standalone mechanism allows them to be composed with other features while keeping queues disabled entirely.

For both Queues and Mailboxes, if your message is a 4-byte message — such as an UINT value — they can (and probably should) be passed by copy: just cast to (VOID*) when transmitting, and cast back to UINT when receiving. Yet, this should be an option only if you are unwilling to use Streams.

3.8.1.1. Usage Example: Work Queue

This example demonstrates implementing a work queue pattern using a Mail Queue.

Multiple producer tasks (Sensor, PID Controller, and UI) create Job_t objects and submit their addresses to a Mail Queue (jobQueue).

In this example, the worker thread logs system activity. As it runs on the lowest priority, it maintains system responsiveness with minimal intrusion.

The same pattern can support actual processing. You could either embed a function pointer in each job for fully dynamic behaviour or define a command ID and use a central dispatch table in the worker thread to invoke appropriate handlers. These are all implementations of the Active Object Pattern.

/* Job_t and queue definitions */
#define MAX_JOBS 8

typedef struct {
    BYTE length;
    BYTE payload[64];
} Job_t;

static Job_t jobPoolBuf[MAX_JOBS]  __attribute__((section("_user_heap")));
static RK_MEM jobPool;
static Job_t *jobQueueBuf[MAX_JOBS];
static RK_QUEUE jobQueue;

/* Plant model state */
static volatile float plantTemp = 25.0f;
static const float ambientTemp = 20.0f;

/* Convert plantTemp to integer for logging */
INT readTemp(VOID)
{
    return (INT)plantTemp;
}

/* Simulate button every 2s */
INT buttonPressed(VOID)
{
    return ((kTickGet() % 2000) < 20); /* the condition will hold true for 20ms every 2s */
}

VOID kApplicationInit( VOID)
{
    kMemInit( &jobPool, jobPoolBuf, sizeof(Job_t), MAX_JOBS);
    kQueueInit( &jobQueue, jobQueueBuf, MAX_JOBS);
}

/* PID Controller Task (High priority) */
/* note: this is a sloppy zero-effort tunning
just for printing something */
VOID PIDControllerTask( VOID *args)
{
    RK_UNUSEARGS

    const float Kp=1.0f, Ki=0.1f, Kd=0.05f;
    float prev=plantTemp;
    float integral=0.0f;
    const float dt=0.5f;

    while(1)
    {
        /* Read plant state */
        float measure = plantTemp;
        /* PID compute */
        float error = 25.0f - measure;
        integral += error * dt;
        float derivative = (measure - prev) / dt;
        float output = Kp*error + Ki*integral - Kd*derivative;
        prev = measure;

        /* Apply to plant model */
        /* the plant cooling model: (temp-amb)*0.1  */
        plantTemp += (output - (plantTemp - ambientTemp)*0.1f) * dt;

        /* Post log job */

        Job_t *jobPtr = kMemAlloc( &jobPool);
        if(jobPtr)
        {
            CHAR buf[32];
            formatFloat (buf, sizeof(buf), output);
            snprintf( (CHAR*)jobPtr->payload, sizeof(jobPtr->payload),
                     "[CTRL] O=%s T=%d", buf, readTemp());
            jobPtr->length = strlen((CHAR*)jobPtr->payload);
            if(kQueuePost( &jobQueue, job, RK_NO_WAIT) != RK_SUCCESS)
            {
            /*as the worker thread is freeing the memory blocks
             if the queue is full and we do not want to block
             we free the allocated memory; otherwise, it would leak
            */
                kMemFree( &jobPool, job);
            }
        }
        kSleepUntil( 500);
    }
}

/* Sensor Task (Mid priority) */
VOID TempSensorTask( VOID *args)
{
    RK_UNUSEARGS
    while(1)
    {
        Job_t *jobPtr = kMemAlloc(&jobPool);
        if(jobPtr)
        {
            snprintf( (CHAR*)jobPtr->payload, sizeof(jobPtr->payload),
                     "[SENSOR] T=%dC", readTemp());
            jobPtr->length = strlen((CHAR*)jobPtr->payload);
            if(kQueuePost( &jobQueue, jobPtr, RK_NO_WAIT) != RK_SUCCESS)
            {
                kMemFree( &jobPool, jobPtr);
            }
        }
        kSleepUntil(1000);
    }
}

/* UI Task (Low priority) */
/* this is to cause a temperature disturbance */
VOID UIButtonTask( VOID *args)
{
    RK_UNUSEARGS
    while(1)
    {
        if(buttonPressed())
        {
            plantTemp -= plantTemp*0.15f  /* disturb the temperature */
            Job_t *jobPtr = kMemAlloc( &jobPool);
            if(jobPtr)
            {
                snprintf((CHAR*)jobPtr->payload, sizeof(jobPtr->payload),
                         "[BTN] Temp: %d", (INT)plantTemp);
                jobPtr->length = strlen((CHAR*)jobPtr->payload);
                if(kQueuePost( &jobQueue, jobPtr, RK_NO_WAIT) != RK_SUCCESS)
                {
                    kMemFree( &jobPool, jobPtr);
                }
            }
        }
        kSleepUntil( 2000);
    }
}

/* Worker Task (Lowest priority) */
VOID WorkerTask( VOID *args)
{
    RK_UNUSEARGS
    Job_t *jobPtr = NULL;
    while(1)
    {
        if(kQueuePend( &jobQueue, (VOID**)&jobPtr, RK_WAIT_FOREVER)==RK_SUCCESS)
        {
            printf("[WORKER] %s\r\n", jobPtr->payload);
            kMemFree( &jobPool, jobPtr);
        }
    }
}
workqueue
3.8.1.2. A Logger Pattern

Here is a Logger Pattern you can reuse leveraging the Mail Queue.

#include <stdio.h> /* needed for printf */
#include <stdarg.h> /* needed for va_args */
#include <kstring.h> /* if including <string.h> RK_* custom string operations are remapped to string.h */

/* this version uses printf. one could just store data on the queue to inspect */

#define LOGLEN     64
#define LOGBUFSIZ  16

 struct log
 {
     RK_TICK   t;
     CHAR      s[LOGLEN];
 }__K_ALIGN(4);

typedef struct log Log_t;

/* memory partition pool */
Log_t  qMemBuf[LOGBUFSIZ]  __attribute__((section("_user_heap")));

/* buffer for the mail queue */
 Log_t  *qBuf[LOGBUFSIZ];

/* mail queue */
 RK_QUEUE    logQ;

/* mem allocator */
 RK_MEM      qMem;


/* (v)printf needs a custom _write() backend syscall, typically using UART */
 void kprintf(const char *fmt, ...)
 {
         va_list args;
         va_start(args, fmt);
         /* printing to stderr; if there is no particular stderr (fd==2) redirection on _write(), this has the same effect of vprintf(fmt, args), using stdout (fd==1) */
         vfprintf(stderr, fmt, args);
         va_end(args);
 }


 VOID logPost(const char *fmt, ...)
 {
     kSchLock();
     Log_t *logPtr = kMemAlloc(&qMem);
     if (logPtr)
     {
         logPtr->t = kTickGetMs();
         va_list args;
         va_start(args, fmt);
         int len = vsnprintf(logPtr->s, sizeof(logPtr->s), fmt, args);
         va_end(args);
         /* if len >= size of the message it has been truncated */
         if (len >= (int)sizeof(logPtr->s))
         {
             /* add  "..." to replace" where the truncation happend */
             if (len > 4)
                 RK_STRCPY(&logPtr->s[len - (int)4], "...");
         }
         /* if queue post fails, free memory so it doesnt leak */
         if (kQueuePost(&logQ, (VOID*)logPtr, RK_NO_WAIT) != RK_SUCCESS)
             kMemFree(&qMem, logPtr);
     }
     kSchUnlock();
 }


/* you can make it as a module and publish the logPost API  */

/* Usage Example: */

VOID someTask(VOID* args)
{
  ...

  while(1)
  {

    UINT value = getValue();
    logPost("Value is %d", value);

   }

}

/* If printing to a terminal, use a low-priority LogTask: */
VOID LogTask(void* args)
 {
     (void)args;
     while (1)
     {
         Log_t* logPtr;
         if (kQueuePend(&logQ, (VOID**)&logPtr, RK_WAIT_FOREVER) == RK_SUCCESS)
         {
             kprintf("%lu ms :: %s\r\n", logPtr->t, logPtr->s);
             kMemFree(&qMem, logPtr);
         }
     }
 }

3.8.2. Stream Queue

Message Stream Control Block

Storage address

Write Address

Read Address

Message Block Size

Max of messages

Message Count

Owner Task

Streams resemble classic (named) Pipes. The difference is that messages have a fixed size. On the other hand, pipes transmit and receive any number of bytes for each operation.

For each Stream, the user provides a buffer address with enough capacity (number of messages x message size). Then, the kernel will handle it as a ring buffer.

The message size associated with a Message Stream instance is defined at its initialisation. On transmission, a message is _ deep copied _ from the sender’s storage to the queue; on reception, it moves from the queue to the receiver’s storage.

Although a message size is associated with a Stream Queue object, the concrete message type depends on the application.

The important primitives for Message Streams are send(), recv(), jam() and peek().

Sending to a full queue (optionally) blocks the sender. Likewise, receiving from an empty queue.

3.8.2.1. Stream Message-Size

Stream Queues must have fixed message-sizes multiples of a WORD. Besides, they must be a power-of-two: 1, 2, 4, 8, 16, 32…​ (words).

RK0 does not establish an upper bound, although I would say that a good cap is 4 words for the regular RK0 target. One has to experiment, though. If a message becomes too large, it introduces prohibitive latency, so the user needs to transmit the message address—i.e., configure the Stream to carry a 1-word message size.

  • Load/Store instructions are optimised to fetch 32-bit words. If message sizes are bounded on a 4-byte boundary, these operations can be executed in a single cycle.

  • A power-of-two constraint is a CPU-aware design choice to prevent unalignment issues.

  • Misaligned memory makes castings unsafe, leading to complex faults, performance penalties or undefined behaviour.

Deep Copies are usually needed for message passing but introduce significant overhead.

Design choice: Be CPU-aware and constrain data size to power of two words.

Benefits: speeds up the copy, achieves more deterministic behaviour, improves run-time safety.

Code-wise, we optimise using pointer arithmetics on pointer to words:

/* Optimised deep copy; guaranteed mesgSize>0 */
/* destPtr and srcPtr are pointers to a word */
#define RK_CPY(destPtr, srcPtr, mesgSize) \
do {                                   \
      while (--mesgSize)               \
      {                                \
     /* if mesgSize is 1, this is NOT executed */
        *(destPtr++) = *(srcPtr++)     \
      };                               \
     /* the last or the only copy is executed now */
     *(destPtr++) = *(srcPtr++)       \
   } while(0U)
3.8.2.2. Usage Example: Averaging Sensor Values

Below is an illustrative snippet of a Queueing Pattern.

The goal is to calculate the average value of 4 types of sensors.

It is convenient to highlight an important aspect here: Given its reactive nature, real-time system software is typically I/O bounded. Tasks that are sensitive to I/O activity have higher priority than CPU-bounded tasks, i.e., those processing data.

A task receives measured sensor values from an ISR on a periodic rate. (A Soft Timer emulates the ISR).

Then it enqueues this data to a consumer - that will process the average value for each of 4 sensors.

The inter-task communication is designed as follows:

  1. The producer pends on a Mailbox that an ISR posts to. An application timer emulates this ISR.

  2. The data extracted from the Mailbox is placed in a queue with the processing task as the consumer.

  3. As the producer’s priority must be higher than that of the consumer, eventually, the queue will get full.

  4. The producer drops the last message when the queue is full and signals the consumer.

  5. Now the consumer has a batch of data to work until the next sensor update. It will block (pend on a signal) whenever the queue is empty.

Here, the queue size was set at 8 items. This is an arbitrary value; the optimal queue size would take into account system edge cases: 'What is the state that represents the most inconvenient time to be interrupted by this sensor?'

#define kPend(timeout)                                \
    do                                                \
    {                                                 \
        kSignalGet(0x1, RK_FLAGS_ANY, NULL, timeout); \
    } while (0)

#define kSignal(taskhandle)           \
    do                                \
    {                                 \
        kSignalSet(taskhandle, 0x01); \
    } while (0)



typedef enum
{
	TEMPERATURE=1, HUMIDITY, CO2, FLOW
}SensorType_t;



/* sensor types */
struct sensorMsg
{
    SensorType_t sensorType;
    ULONG sensorValue;

};

typedef struct sensorMsg Mesg_t;

#define N_MESSAGE 8
#define MESSAGE_SIZE (sizeof(Mesg_t))/4 /* WORDS! */
#define N_SENSOR    4
#define AVG_WINDOW_SIZE   10 /* 10 samples */

RK_STREAM sensorStream;/* the stream kobject */
Mesg_t mesgBuf[N_MESSAGE] = {0};/* queue buffer */
RK_TIMER timerT1;
RK_MBOX sensorBox;
static Mesg_t sample = {0};
static UINT sampleErr;

VOID callBackISR(VOID* ARGS);

VOID kApplicationInit( VOID)
{
    RK_ERR err = kStreamInit(&sensorStream, (VOID*) mesgBuf, MESSAGE_SIZE,
    N_MESSAGE);

    kassert(err==RK_SUCCESS);

    /* timer @ every 10 ms */
    err = kTimerInit(&timerT1, 0, 10, callBackISR, NULL, RK_TIMER_RELOAD);
    kassert( err==RK_SUCCESS);

    err = kMboxInit(&sensorBox, NULL);
    kassert( err==RK_SUCCESS);
}

VOID callBackISR(VOID *args)
{
    RK_UNUSEARGS
    sample.sensorType = (rand() % 4) + 1;
    switch (sample.sensorType)
    {
        case TEMPERATURE:
            sample.sensorValue = ( ULONG) rand() % 50;
            break;
        case HUMIDITY:
            sample.sensorValue = ( ULONG) rand() % 100;
            break;
        case CO2:
            sample.sensorValue = ( ULONG) rand() % 1000;
            break;
        case FLOW:
            sample.sensorValue = ( ULONG) rand() % 10;
            break;
        default:
            break;
    }
    RK_ERR err = kMboxPost( &sensorBox, &sample, RK_NO_WAIT);
    if (err != RK_SUCCESS)
        sampleErr ++;

}

/* Producer - higher priority, blocks on mailbox */
VOID Task1(VOID *args)
{
    RK_UNUSEARGS
    Mesg_t *recvSample = NULL;
    while (1)
    {
        RK_ERR errmbox = kMboxPend( &sensorBox, ( VOID**) &recvSample,
                RK_WAIT_FOREVER);
        kassert( errmbox==RK_SUCCESS);
        RK_ERR err = kStreamSend( &sensorStream, &sample, RK_NO_WAIT);
        kassert(err >= 0); /* either succesful or unsuccesful */
        if (err == RK_SUCCESS)
        {
            CHAR const *sensorTypeStr = NULL;
            if (recvSample->sensorType == 1)
                sensorTypeStr = "TEMP";
            if (recvSample->sensorType == 2)
                sensorTypeStr = "HUM";
            if (recvSample->sensorType == 3)
                sensorTypeStr = "CO2";
            if (recvSample->sensorType == 4)
                    sensorTypeStr = "FLOW";
            printf( "ENQ: [@%lums, %s, %lu] \r\n", kTickGet(), sensorTypeStr,
                        recvSample->sensorValue);
        }
        else if (err == RK_ERR_STREAM_FULL)
        {
            kSignal(task2Handle);
        }
    }
}

/* for each sensor:
 . a ring buffer of AVG_WINDOW_SIZE values
 . sum of values
 . an index table (=enum - 1 eg., HUMIDITY IDX=2-1=1)
 */
static ULONG ringBuf[N_SENSOR][AVG_WINDOW_SIZE];
static ULONG ringSum[N_SENSOR] = {0};
static UINT ringIndex[N_SENSOR] = {0};

void Task2( void *args)
{

    RK_UNUSEARGS
    Mesg_t readSample;
    while (1)
    {
        RK_ERR err = kStreamRecv(&sensorStream, (VOID*)&readSample,
        RK_NO_WAIT);
        if (err == RK_SUCCESS)
        {
            UINT sensorIdx = readSample.sensorType - 1;

/* remove oldest sample */
            ULONG oldest = ringBuf[sensorIdx][ringIndex[sensorIdx]];
            ringSum[sensorIdx] -= oldest;

/* push new sample */
            ringBuf[sensorIdx][ringIndex[sensorIdx]] = readSample.sensorValue;
            ringSum[sensorIdx] += readSample.sensorValue;

/* index incr-wrap */
            ringIndex[sensorIdx] ++;
            ringIndex[sensorIdx] %= AVG_WINDOW_SIZE;

/* simple average */
            ULONG avg = ringSum[sensorIdx] / AVG_WINDOW_SIZE;


            CHAR const *sensorTypeStr = NULL;
            if (readSample.sensorType == 1)
                sensorTypeStr = "TEMP";
            if (readSample.sensorType == 2)
                sensorTypeStr = "HUM";
            if (readSample.sensorType == 3)
                sensorTypeStr = "CO2";
            if (readSample.sensorType == 4)
                sensorTypeStr = "FLOW";

            printf( "DEQ: [@%lums, %s, %lu] | AVG: %lu \r\n", kTickGet(),
                    sensorTypeStr, readSample.sensorValue, avg);

        }
        else
        {
            kPend(RK_WAIT_FOREVER);
        }

    }
}
OUTPUT:

ENQ: [@550ms, CO2, 571]
ENQ: [@560ms, FLOW, 4]
ENQ: [@570ms, FLOW, 4]
ENQ: [@580ms, HUM, 25]
ENQ: [@590ms, CO2, 931]
ENQ: [@600ms, CO2, 487]
ENQ: [@610ms, FLOW, 7]
ENQ: [@620ms, HUM, 79]

>>> Queue is full. Now offload and process. Note the order remains <<<

DEQ: [@630ms, CO2, 571] | AVG: 460
DEQ: [@631ms, FLOW, 4] | AVG: 5
DEQ: [@632ms, FLOW, 4] | AVG: 5
DEQ: [@633ms, HUM, 25] | AVG: 52
DEQ: [@634ms, CO2, 931] | AVG: 553
DEQ: [@635ms, CO2, 487] | AVG: 549
DEQ: [@636ms, FLOW, 7] | AVG: 5
DEQ: [@637ms, HUM, 79] | AVG: 55

>>> Consumer is preempted <<<
ENQ: [@640ms, CO2, 913]
ENQ: [@650ms, CO2, 134]
ENQ: [@660ms, HUM, 47]
ENQ: [@670ms, HUM, 30]
ENQ: [@680ms, TEMP, 7]
ENQ: [@690ms, CO2, 726]
ENQ: [@700ms, FLOW, 7]
ENQ: [@710ms, TEMP, 43]

DEQ: [@720ms, CO2, 913] | AVG: 578
DEQ: [@721ms, CO2, 134] | AVG: 543
DEQ: [@722ms, HUM, 47] | AVG: 51
DEQ: [@723ms, HUM, 30] | AVG: 44
DEQ: [@724ms, TEMP, 7] | AVG: 20
DEQ: [@725ms, CO2, 726] | AVG: 592
DEQ: [@726ms, FLOW, 7] | AVG: 5
DEQ: [@727ms, TEMP, 43] | AVG: 23

3.8.3. Summing Up: Stream Queues vs Mail Queues

While both are Message Queues, they are distinct designs that lead to ideal use cases. Note that Mail Queues are particularly difficult to generalise.

Feature Mail Queue (Pointer-Based) Stream Queue (Deep Copy-Based)

Message Storage

Stores pointers to messages

Stores deep copies of messages

Message Size

Either pointer-sized or can vary for each message.

Fixed (defined at queue initialisation)

Memory Management

Internal pre-allocated (1 pointer/message). It might need a second storage.

Internal (pre-allocated buffer, N-words/message).

Data Ownership

Sender/receiver manage lifecycle

Kernel.

Performance

A 'zero-copy' transmission is faster.

Deterministic. Kernel Optimised deep-copy.

Best Use Cases

Work Queues, Client-Server with dynamic payload, any case where zero-copy or 1-copy is feasible

Real-time data streaming (e.g., sensor pipelines, inter-device communication).

3.9. Message Passing ownership

Using queues to communicate between multiple tasks is chaos. Many senders to many receivers can be unpredictable. We often want N:1 (senders:receiver, N can be 1). This 1 makes it easier to reason about the dynamics.

In real-time design, we often expect to see blocking send() operations on 1:1 or N:1 channels—a blocking send() on a 1:N (broadcast) would be very odd.

3.9.1. Priority Inversion on Message Passing

Priority Inversion happens on Message-Passing for similar but subtly different reasons from resource sharing.

Design Choice: Add an ownership mechanism for a message-passing object—a well-defined receiver—so priority propagation can be applied.

Benefit: This preserves strict real-time guarantees, making sure a high-priority task never waits indefinitely for a lower-priority task to finish message operations

While sharing some similarities, there are subtle differences between blocking on a shared resource (by blocking on a locked mutex) and blocking on a message-passing object.

Assuming cases we do not want messages to be overwritten, a sender, when accessing a queue, is acquiring an empty buffer. A receiver is acquiring a full buffer. They are competing for the same object but in different states. Thus, they depend on each other to change the object state.

When a sender blocks on a full shared message object, it does not mean another writer is using the resource; By design it is also unlikely there is a reader blocked on the waiting queue of the object, since every time a write operation completes, any reader blocked on the queue is readied. Whether it is dispatched or not is a scheduler concern. If its priority is higher than the task just finished, it will be immediately dispatched. If not, it is enqueued in the ready queue until it is eventually picked up.

This means the priority inversion problem arises from waiting for the consumer rather than from direct contention among multiple senders.

So, if the sender’s priority is higher, it could be propagated to the reader. But, which reader? (This is why semaphores cannot implement priority inheritance protocol — the waiter task cannot know a potential signaller).

With that in mind, there is the option to set ownership: set owner (mesgpass, task handle). From now on, only the owner task can receive from that service object—a blocking send() knows the target task and can raise its priority.

(As 1:N communication normally non-blocking on real-time systems, there is no mechanism to establish 'sender ownership'.)

If another task that is not the owner tries to receive from a kernel message-passing object that has an owner, it fails.

These kernel objects now will resemble an aspect of Ports, a common way of representing tasks on message-passing kernels. (Strictly, they are not Ports, as RK0 is not a message-passing kernel—although I do like the approach.)

3.10. Most-Recent Message Protocol (MRM)

MRM Control Block

MRM Buffer Allocator

Data Buffer Allocator

Current MRM Buffer Address

Data Size (Message Size)

MRM Buffer

Data Buffer Address

Readers Count

Data Buffer

Application-dependent

There is little practical difference between a message that does not arrive and one with no valid (stale) data. But when wrong (or stale) data is processed - e.g., to define a set point on a loop - a system can fail badly.

Design Choice: provide a broadcast asynchronous message-passing scheme that guarantees data freshness and integrity for all readers.

Benefits: The system has a mechanism to meet strict deadlines that cannot be predicted on design time.

Control loops reacting to unpredictable time events—like a robot scanning an environment or a drive-by-wire system—require a different message-passing approach. Readers cannot "look at the past" and cannot block. The most recent data must be delivered lock-free and have guaranteed integrity.

3.10.1. Functional Description

An MRM works as a 1-to-many asynchronous Mailbox - with a lock-free specialisation that enables several readers to get the most recent deposited message with no integrity issues. Whenever a reader reads an MRM buffer, it will find the most recent data transmitted. It can also be seen as an extension of the Double Buffer pattern for a 1:N communication.

The core idea of the MRM protocol is that readers can only access the buffer that is classified as the 'most recent buffer'. After a writer publish() a message, that will be the only message readers can get() — any former message being processed by a reader was grabbed before a new publish() - and, from now on, can only be unget(), eventually returning to the pool.

To clarify further, the communication steps are listed:

  1. A producer first reserves an MRM Buffer - the reserved MRM Buffer is not available for reading until it is published.

  2. A message buffer is allocated and filled, and its address is within an MRM Buffer. The producer publishes the message. From now on, it is the most recent message. Any former published buffer is no longer visible to new readers

  3. A reader starts by getting an MRM Buffer. A get() operation delivers a copy of the message to the reader’s scope. Importantly, this operation increases the number of readers associated to that MRM Buffer.

Before ending its cycle, the task releases (unget()) the buffer; on releasing, the kernel checks if the caller task is the last reader and if the buffer being released is not the current MRM Buffer.

If the above conditions are met, the unget() operation will return the MRM buffer to the pool. If there are more readers, OR if it is the current buffer, it remains available.

When the reserve operation detects that the most recent buffer still has readers, a new buffer is allocated to be written and published. If it has no readers, it is reused.

This way, the worst case is a sequence of publish() with no unget() at all — this would lead to the writer finding no buffer to reserve. This is prevented by making: N Buffers = N tasks + 1.

3.10.2. MRM Control Block Configuration

What might lead to some confusion when initialising an MRM Control Block is the need for two different pools:

  • One pool will be the storage for the MRM Buffers, which is the data structure for the mechanism.

  • Another pool is for the actual payload. The messages.

Both pools must have the same number of elements: the number of tasks communicating + 1.

  • The size of the data buffers is application-dependent - and is passed as a number of words. The minimal message size is 32-bit.

  • If using data structures, keep it aligned to 4 to take advantage of the performance of aligned memory.

3.10.3. Usage Example

Consider a modern car - speed variations are of interest in many modules. With a somehow "naive" approach, let us consider three modules and how they should react when speed varies:

  1. Cruiser Control: For the Cruiser Control, a speed increase might signify the driver wants manual control back, and it will likely turn off.

  2. Windshield Wipers: If they are on, a speed change can affect the electric motor’s adjustments to the air resistance.

  3. Radio: Speed changes reflect the aerodynamic noise - the radio volume might need adjustment.

As the variations are unpredictable, we need a mechanism to deliver the last speed in order of importance for all these modules. From highest to lowest priority, Cruise, Wipers, and Radio are the three modules that range from safety to comfort.

To emulate this scenario, we can write an application with a higher priority task that sleeps and wakes up at pseudo-random times to produce random values that represent the (unpredictable) speed changes.

The snippet below has 4 periodic tasks. Tasks sleep for absolute periods. The producer publishes new data at a random interval, so it can either interrupt before one has the chance to finish or be inactive while it runs more than once.

typedef struct
{
	UINT 	speed;
	RK_TICK timeStamp;
} Mesg_t;

#define N_MRM (5)  /* Number of MRMs N Tasks + 1 */
#define MRM_MESG_SIZE (sizeof(Mesg_t)/4) /* In WORDS */
RK_MRM MRMCtl;/* MRM control block */
RK_MRM_BUF buf[N_MRM];/* MRM pool */
Mesg_t data[N_MRM];/* message data pool */


VOID kApplicationInit( VOID)
{
	kCreateTask(&speedSensorHandle, SpeedSensorTask, RK_NO_ARGS, "SpeedTsk", stack1, STACKSIZE, 1, RK_PREEMPT);
	kCreateTask(&cruiserHandle, CruiserTask, RK_NO_ARGS, "CruiserTsk", stack2, STACKSIZE, 2, RK_PREEMPT);
	kCreateTask(&wiperHandle, WiperTask, RK_NO_ARGS, "WiperTsk", stack3, STACKSIZE, 3, RK_PREEMPT);
	kCreateTask(&radioHandle, RadioTask, RK_NO_ARGS, "RadioTsk", stack4, STACKSIZE, 4, RK_PREEMPT);
    kMRMInit( &MRMCtl, buf, data, N_MRM, MRM_MESG_SIZE);
 }

VOID SpeedSensorTask( VOID *args)
{
    RK_UNUSEARGS

	Mesg_t sendMesg = {0};
	while (1)
    {
        RK_TICK currTick = kTickGet();
        UINT speedValue = (UINT) (rand() % 170) + 1;
		sendMesg.speed = speedValue;
		sendMesg.timeStamp = currTick;
		/* grab a buffer */
        RK_MRM_BUF *bufPtr =  kMRMReserve( &MRMCtl);
        if (bufPtr != NULL)
        {
            kMRMPublish( &MRMCtl, bufPtr,  &sendMesg);
        }
        else
        {/* cannot fail */
            kassert( 0);
        }
/* publish  */
         printf( "! @ %dT: SPEED UPDATE: %u \r\n", currTick, speedValue);
		RK_TICK sleepTicks = (( RK_TICK) rand() % 15) + 1;
        kSleepUntil( sleepTicks);
	}
}

VOID CruiserTask( VOID *args)
{
    RK_UNUSEARGS

	Mesg_t recvMesg = {0};
    while (1)
    {
        RK_MRM_BUF *readBufPtr = kMRMGet( &MRMCtl,  &recvMesg);
		printf( "@ %dT CRUISER: (%u, %uT) \r\n", kTickGet(), recvMesg.speed, recvMesg.timeStamp);
 		kMRMUnget( &MRMCtl, readBufPtr);
		kSleepUntil( 4);

    }
}


VOID WiperTask( VOID *args)
{
    RK_UNUSEARGS
	Mesg_t recvMesg = {0};

    while (1)
    {

        RK_MRM_BUF *readBufPtr = kMRMGet( &MRMCtl,  &recvMesg);
		printf( "@ %dT WIPERS: (%u, %uT) \r\n", kTickGet(), recvMesg.speed, recvMesg.timeStamp);
 		kMRMUnget( &MRMCtl, readBufPtr);
		kSleepUntil( 8);


    }
}
VOID RadioTask( VOID *args)
{
    RK_UNUSEARGS
	Mesg_t recvMesg = {0};

    while (1)

    {
        RK_MRM_BUF *readBufPtr = kMRMGet( &MRMCtl, &recvMesg);
		printf( "@ %dT RADIO: (%u, %uT) \r\n", kTickGet(), recvMesg.speed, recvMesg.timeStamp);
 		kMRMUnget( &MRMCtl, readBufPtr);
 		kSleepUntil(12);

    }

}

Thus, different situations can happen:

  • All tasks read the updated pair (speed, time)

  • Not all tasks receive the updated pair because another update happens in between.

  • No tasks receive an update - because another happens too soon.

  • All tasks receive an update and will keep rereading the same values.

All these cases are on the image:

pumpdrop
The highlight is that controllers can keep their pace, while receiving fresh data - you can see it on the timestamp on the image. Again, they might receive the same data more than once or miss samples; what is important is that they are not lagging and consuming stale data.

4. Error Handling

4.1. Fail fast

While tracing and error handling are yet to be largely improved (and that is when the 1.0.0 version will be released), currently RK0 employs a policy of failing fast in debug mode.

When Error Checking is enabled, every kernel call will be 'defensive', checking for correctness of parameters and invariants, null dereferences, etc.

In these cases is more useful to allow the first error to halt the execution by calling an Error Handler function to observe the program state.

A trace structure records the address of the running TCB, its current stack pointer, the link register (that is, the PC at kErrHandler was called), and a time stamp.

This record is on a .noinit RAM section, so it is visible if CPU resets. A fault code is stored in a global faultID and on the trace structure. Developers can hook in custom behaviour.

If the kernel is configured to not halt on a fault, but Error Checking is enabled, functions will return negative values in case of an error.

On the other hand, when Error Checking is disabled or NDEBUG is defined nothing is checked, reducing code size and improving performance.

(Some deeper internal calls have assertion. For those, only NDEBUG defined ensures they are disabled.)

4.2. Stack Overflow

Stack overflow is detected (not prevented) using a "stack painting" with a sentinel word. Stack Overflow detection is enabled by defining the assembler preprocessor __K_DEF_STACKOVFLW when compiling.

One can take advantage of the static task model - it is possible to predict offline the deepest call within any task. The -fstack-usage compiler flag will create .su files indicating the depth of every function within a module. This is an example for the results of a compilation unit:

core/src/ksynch.c:61:8:kSignalGet	112	static
core/src/ksynch.c:163:8:kSignalSet	72	static
core/src/ksynch.c:214:8:kSignalClear	40	static
core/src/ksynch.c:237:8:kSignalQuery	56	static
core/src/ksynch.c:260:8:kEventInit	40	static
core/src/ksynch.c:279:8:kEventSleep	112	static
core/src/ksynch.c:343:8:kEventWake	88	static
core/src/ksynch.c:383:8:kEventSignal	64	static
core/src/ksynch.c:412:7:kEventQuery	16	static
core/src/ksynch.c:429:8:kSemaInit	88	static
core/src/ksynch.c:470:8:kSemaPend	96	static
core/src/ksynch.c:555:8:kSemaPost	88	static
core/src/ksynch.c:603:8:kSemaWake	72	static
core/src/ksynch.c:640:5:kSemaQuery	40	static
core/src/ksynch.c:662:8:kMutexInit	16	static
core/src/ksynch.c:681:8:kMutexLock	120	static
core/src/ksynch.c:761:8:kMutexUnlock	96	static
core/src/ksynch.c:834:6:kMutexQuery	56	static

These are the worst cases. Now, you identify the depth of the longest chain of calls for a task using these services and add a generous safety margin — 30%. The cap depends on your budget.

Importantly, you also have to size the System Stack. This initial size is defined on linker.ld and on the symbol Min_Stack_Size. In this case, you need to account for the depth of main(), kApplicationInit(), and all interrupt handlers — again, inspect the the longest call chain depth. Assume interrupts will always add to the worst static depth, and make sure to account for nested interrupts.

4.3. Deadlocks

There are deadlock recovering mechanisms in the literature, a pity they are unfeasible here. The kernel provides bounded waiting, enforces every waiting queue to priority discipline, and applies priority inheritance to mutexes and message passing. Besides, it provides lock-free primitives and compensates for time drifting if a period is enforced on tasks. Well, none of these techniques can prevent deadlocks (right, with bounded blocking and lock free primitives one can get livelocks).

  • Ordered Locking:

For those programming the application, despite following the RMS rule of higher priority for higher request rate tasks, the golden rule for locking is acquiring resources in an unidirectional order throughout the entire application:

acquire(A);
acquire(B);
acquire(C);
   .
   .
   .
release(C);
release(B);
release(A);

This breaks circular waiting.

For instance:

TaskA:
   wait(R1);
   wait(R2);
    /* critical section */
   signal(R2);
   signal(R1);

TaskB:
   wait(R1);
   wait(R2);
    /* critical section */
   signal(R2);
   signal(R1);

But, if:

TaskA:
    wait(R1);
    wait(R2);
    .
    .

TaskB:
    wait(R2);
    wait(R1);
    .
    .

There are some possible outcomes:

  1. Deadlock:

    • TaskA runs: acquires R1

    • TaskB runs: acquires R2

    • TaskA runs: tries to acquire R2 — blocked

    • TaskB runs: tries to acquire R1 — blocked

  2. No deadlock:

    • TaskA runs: acquires R1

    • TaskA runs: acquires R2 (nobody is holding R2)

    • TaskA releases both; TaskB runs and acquires both (in either order)

Overall, there is no deadlock if tasks do not overlap in critical sections. That is why systems run for years without deadlocks and eventually: ploft.

  • Use a 'master-lock' with 'try' semantics:

Another technique that can be employed is if one needs to acquire multiple locks—acquire them all or none using a try-lock (RK_NO_WAIT). If any of the tries fail, the task gives up on acquiring the resources and backs off, releasing all successful locks to retry later (most simply, using a sleep queue). That is easier said than done, though, and, as mentioned, if not well done, instead of deadlocks, one gets livelocks.

(Livelocks are when a couple of tasks keep running, but the program does not advance.)

5. RK0 Services API

This section describes the kernel service calls.

Convention

  • A kernel call starts with a lowercase k. Typically it is followed by a kernel object identifier and an action.

kSemaPend(&sema, 800); /* pend on a semaphore; 800 ticks time-out */
  • When k is followed by an action, it is acting on the caller task.

kSleep(150); /* sleep-delay the caller task for 150 ticks */
  • Some calls can act either on the caller or on another task:

/* stores the signal flags of the task identified by task1Handle on queryValue */
kSignalQuery(task1Handle, &queryValue);

/* retrieves its own signal flags */
kSignalQuery(NULL, &queryValue);

Return Values

With a few exceptions, kernel calls return a RK_ERR error code. 0 is a successful operation (RK_SUCCCESS) and any negative value is an error that indicates failure. A positive value is an unsuccesful operation, but will not lead the system to failure (e.g., any unsuccesful try operation).

These return codes are defined in kcommondefs.h.

5.1. Task Management

5.1.1. kCreateTask

Description

Assembles a declared task.

Signature
RK_ERR kCreateTask( RK_TASK_HANDLE *taskHandlePtr,
		const RK_TASKENTRY taskFunc, VOID *argsPtr,
		CHAR *const taskName, RK_STACK *const stackAddrPtr,
		const UINT stackSize, const RK_PRIO priority,
		const BOOL preempt);
Parameters
  • taskHandlePtr:: Pointer to the Handle object for the task.

  • taskFunc:: Task’s entry function. Prototype: VOID taskFunc(VOID *)

  • argsPtr:: Pointer to initial task arguments. (opt. RK_NO_ARGS)

  • taskName:: Task name (max default 8 bytes, set in RK_NAME macro).

  • stackAddr:: Pointer to the task stack (the array’s name).

  • stackSize:: Size of the task stack (in WORDS; 1 WORD = 4 BYTES).

  • priority:: Task priority, valid range: 0-31.

  • preempt:: Values: RK_PREEMPT / RK_NO_PREEMPT. If RK_NO_PREEMPT, the task once dispatched is not preempted by user tasks until READY/WAITING state. Still, it can be preempted by hardware/interrupts and then resumed. Used for tasks like deferred ISR handlers. They are supposed to be exceptional.

Helper
RK_DECLARE_TASK(handleName, taskFunc, stackAddr, stackSizeWords)

Declare the objects needed to assemble a task.

5.1.2. kInit

Description

Initialises the kernel. This will be called in main() after hardware initialisation.

Signature
VOID kInit(VOID);

5.1.3. kYield

Description

The current task yields the processor, which is essential when round-robining between tasks with the same priority if there are no other blocking calls.

Signature
VOID kYield(VOID);

5.1.4. kSchLock

Description

Disables preemption for the current task until kSchUnlock() is issued.

Signature
VOID kSchLock(VOID);

5.1.5. kSchUnlock

Description

Restore preemption status. To be called at the end of the operation guarded by kSchLock().

Signature
VOID kSchUnlock(VOID);

5.2. Signals

5.2.1. kSignalGet

A task pends on its own event flags.

Signature
RK_ERR kSignalGet(ULONG const required,
                  UINT const options,
                  ULONG *const gotFlagsPtr,
                  RK_TICK const timeout);
Parameters
  • required:: Combination of required flags (bitstring, non-zero).

  • options:: RK_FLAGS_ANY or RK_FLAGS_ALL.

  • gotFlagsPtr:: Pointer to store returned flags (optional, NULL allowed).

  • timeout:: Suspension timeout if required flags are not met.

5.2.2. kSignalSet

Posts a combination of flags to a task.

Signature
RK_ERR kSignalSet(RK_TASK_HANDLE const taskHandle, ULONG const mask);
Parameters
  • taskHandle:: Receiver Task handle.

  • mask:: Bitmask to signal (non-zero).

5.2.3. kSignalQuery

Retrieves a task’s signal flags.

Signature
RK_ERR kSignalQuery(RK_TASK_HANDLE const taskHandle, ULONG *const gotFlagsPtr);
Parameters
  • taskHandle:: Target task; use NULL if target is caller task.

  • gotFlagsPtr:: Pointer to store current flags.

5.2.4. kSignalClear

Clears the caller task flags.

Signature
RK_ERR kSignalClear(VOID);

5.3. Events (Sleep Queues)

5.3.1. kEventInit

Initialises an event object.

Signature
RK_ERR kEventInit(RK_EVENT *const kobj);
Parameters
  • kobj:: Pointer to RK_EVENT object.

5.3.2. kEventSleep

Suspends a task waiting for a wake signal.

Signature
RK_ERR kEventSleep(RK_EVENT *const kobj, const RK_TICK timeout);
Parameters
  • kobj:: Pointer to RK_EVENT object.

  • timeout:: Suspension time.

5.3.3. kEventWake

Broadcast signal for an event.

Signature
RK_ERR kEventWake(RK_EVENT *const kobj, UINT nTasks, UINT *uTasksPtr);
Parameters*
  • kobj:: Event address.

  • nTasks:: Number of tasks to wake (0 = all).

  • uTasksPtr:: (Optional) Pointer to store number of unreleased tasks, if any.

Helper

Wakes all tasks.

RK_ERR kEventFlush(RK_EVENT *const kobj);

5.3.4. kEventSignal

Wakes a single task sleeping for a specific event (by priority).

Signature
RK_ERR kEventSignal(RK_EVENT *const kobj);
Parameters
  • kobj:: Pointer to RK_EVENT object.

5.3.5. kEventQuery

Description

Retrieves the number of tasks sleeping on an event.

Signature
RK_ERR kEventQuery( RK_EVENT const * const kobj, ULONG *const nTasksPtr);
Parameters
  • kobj:: Pointer to RK_EVENT object.

  • nTasksPtr:: Pointer to store the retrieved number of tasks.

5.4. Semaphores (Counting/Binary)

5.4.1. kSemaInit

Initialises a semaphore.

Signature
RK_ERR kSemaInit(RK_SEMA *const kobj, UINT const semaType, const UINT value);
Helpers
RK_ERR kSemaCountInit(RK_SEMA *const kobj, const UINT value);

RK_ERR kSemaBinInit(RK_SEMA *const kobj, const UINT value);
Parameters
  • kobj:: Semaphore address.

  • semaType:: Counting(RK_SEMA_COUNT) or Binary (RK_SEMA_BIN).

  • value:: Initial value (>= 0).

5.4.2. kSemaPend

Waits on a semaphore.

Signature
RK_ERR kSemaPend(RK_SEMA *const kobj, const RK_TICK timeout);
Alias
RK_ERR kSemaWait(RK_SEMA *const kobj, const RK_TICK timeout);
Parameters
  • kobj:: Semaphore address.

  • timeout:: Maximum suspension time.

5.4.3. kSemaPost

Signals a semaphore.

Signature
RK_ERR kSemaPost(RK_SEMA *const kobj);
Alias
RK_ERR kSemaSignal(RK_SEMA *const kobj);
Parameters
  • kobj:: Semaphore address.

5.4.4. kSemaWake

Broadcast signal to a semaphore.

Signature
RK_ERR kSemaWake(RK_SEMA *const kobj, UINT nTasks, UINT *uTasksPtr);
Parameters
  • kobj:: Semaphore address.

  • nTasks:: Number of tasks to wake (0 = all).

  • uTasksPtr:: (Optional) Pointer to store number of unreleased tasks, if any.

Helper:

Wakes ALL tasks

RK_ERR kSemaFlush(RK_SEMA *const kobj);

5.4.5. kSemaQuery

Retrieves the count value of a semaphore. A negative value is the number of blocked tasks. A non-negative value is the semaphore’s counter number.

Signature
RK_ERR kSemaQuery(RK_SEMA const * const kobj, INT *const countPtr)
Parameters
  • kobj:: Semaphore address.

  • countPtr:: Pointer to store the retrieved value.

5.5. Mutex Semaphore

5.5.1. kMutexInit

Initialises a mutex.

Signature
RK_ERR kMutexInit(RK_MUTEX *const kobj, UINT const prioInh);
Parameters
  • kobj:: Mutex address.

  • prioInh:: Priority inheritance. RK_INHERIT / RK_NO_INHERIT

5.5.2. kMutexLock

Locks a mutex.

Signature
RK_ERR kMutexLock(RK_MUTEX *const kobj, BOOL const prioInh, RK_TICK const timeout);
Parameters
  • kobj:: Mutex address.

  • prioInh:: Apply priority inheritance (RK_INHERIT / RK_NO_INHERIT).

  • timeout:: Maximum suspension time.

5.5.3. kMutexUnlock

Unlocks a mutex.

Signature
RK_ERR kMutexUnlock(RK_MUTEX *const kobj);
Parameters
  • kobj:: Mutex address.

5.5.4. kMutexQuery

Retrieves the state of a mutex.

Signature
RK_ERR kMutexQuery( RK_MUTEX const *const kobj, UINT *const statePtr);
Parameters
  • kobj:: Mutex address.

  • statePtr:: Pointer to store the state (0 unlocked, 1 locked).

5.6. Condition Variables

Helpers for managing Condition Variables using the RK_EVENT service .

5.6.1. kCondVarWait

Unlocks the associated mutex and sleep for a wake signal.

Signature
RK_ERR kCondVarWait(RK_EVENT *const eventPtr, RK_MUTEX *const mutexPtr, RK_TICK timeout)
Parameters
  • eventPtr:: Event object composing the condition variable.

  • mutexPtr:: Mutex that is locked for testing the condition.

  • timeout :: Timeout for both waiting on event and locking on mutex.

5.6.2. kCondVarSignal

Same as kEventSignal().

Signature
RK_ERR kCondVarSignal(RK_EVENT *const eventPtr)

5.6.3. kCondVarBroadcast

Same as kEventFlush().

Signature
RK_ERR kCondVarBroadcast(RK_EVENT *const eventPtr)

5.7. Mailbox

5.7.1. kMboxInit

Initialises an indirect single mailbox.

Signature
RK_ERR kMboxInit(RK_MBOX *const kobj, VOID *const initMailPtr);
Parameters
  • kobj:: Mailbox address.

  • initMailPtr:: Initial mail address if initialising full; else NULL.

5.7.2. kMboxSetOwner

Assigns a task owner for the mailbox.

Signature
RK_ERR kMboxSetOwner(RK_MBOX *const kobj, const RK_TASK_HANDLE taskHandle);
Parameters
  • kobj:: Mailbox address.

  • taskHandle:: Task handle.

5.7.3. kMboxPost

Sends to a mailbox.

Signature
RK_ERR kMboxPost(RK_MBOX *const kobj, VOID *sendPtr, RK_TICK const timeout);
Parameters
  • kobj:: Mailbox address.

  • sendPtr:: Mail address.

  • timeout:: Suspension timeout.

5.7.4. kMboxPend

Receives from a mailbox.

Parameters
  • kobj:: Mailbox address.

  • recvPPtr:: Pointer to store message address (pointer-to-pointer).

  • timeout:: Suspension timeout.

Signature
RK_ERR kMboxPend(RK_MBOX *const kobj, VOID **recvPPtr, RK_TICK const timeout);

5.7.5. kMboxPeek

Description

Reads the mail without extracting it.

Signature
RK_ERR kMboxPeek(RK_MBOX *const kobj, VOID **peekPPtr);
Parameters
  • kobj:: Mailbox address.

  • peekPPtr:: Pointer to receive mail address.

5.7.6. kMboxQuery

Description

Retrieves the state of a mailbox.

Signature
RK_ERR kMboxQuery( RK_MBOX const * const kobj, UINT *const statePtr)
Parameters
  • kobj:: Mailbox address.

  • statePtr:: Pointer to store the state (1=FULL, 0=EMPTY)

5.7.7. kMboxPostOvw

Description

Posts to a mailbox overwriting current mail, if any.

Signature
RK_ERR kMboxPostOvw(RK_MBOX *const kobj, VOID *sendPtr);
Parameters
  • kobj:: Mailbox address.

  • sendPtr:: Mail address.

5.8. Mail Queues

5.8.1. kQueueInit

Description

Initialises a mail queue.

Signature
RK_ERR kQueueInit(RK_QUEUE *const kobj, VOID *bufPtr, const ULONG maxItems);
Parameters
  • kobj:: Mail Queue address.

  • bufPtr:: Pointer to buffer storing mail addresses.

  • maxItems:: Maximum number of mails.

5.8.2. kQueueSetOwner

Description

Assigns a task owner for the queue.

Signature
RK_ERR kQueueSetOwner(RK_QUEUE *const kobj, const RK_TASK_HANDLE taskHandle);
Parameters
  • kobj:: Mail Queue address.

  • taskHandle:: Task handle.

5.8.3. kQueuePost

Description

Sends to a mail queue.

Signature
RK_ERR kQueuePost(RK_QUEUE *const kobj, VOID *sendPtr, RK_TICK const timeout);
Parameters
  • kobj:: Mail Queue address.

  • sendPtr:: Mail address.

  • timeout:: Suspension timeout.

5.8.4. kQueuePend

Receives from a mail queue.

Signature
RK_ERR kQueuePend(RK_QUEUE *const kobj, VOID **recvPPtr, RK_TICK const timeout);
Parameters
  • kobj:: Mail Queue address.

  • recvPPtr:: Pointer to store message address (pointer-to-pointer).

  • timeout:: Suspension timeout.

5.8.5. kQueuePeek

Description

Reads the head mail without extracting.

Signature
RK_ERR kQueuePeek(RK_QUEUE *const kobj, VOID **peekPPtr);
Parameters
  • kobj:: Mail Queue address.

  • peekPPtr:: Pointer to receive address.

5.8.6. kQueueJam

Description

Sends a message to the queue front.

Signature
RK_ERR kQueueJam(RK_QUEUE *const kobj, VOID *sendPtr, RK_TICK const timeout);
Parameters
  • kobj:: Queue address.

  • sendPtr:: Message address.

  • timeout:: Suspension time.

5.8.7. kQueueQuery

Description

Retrieves number of mails within queue.

Signature
RK_ERR kQueueQuery(RK_QUEUE const * const kobj, UINT *const nMailPtr);
Parameters
  • kobj:: Mail Queue address.

  • nMailPtr:: Pointer to store the retrieved number.

5.9. Stream Queue

5.9.1. kStreamInit

Description

Initialises a Stream Message Queue.

Signature
RK_ERR kStreamInit(RK_STREAM *const kobj, VOID *bufPtr,
                   const ULONG mesgSizeInWords, const ULONG nMesg);
Parameters
  • kobj:: Stream Queue address.

  • bufPtr:: Allocated memory.

  • mesgSizeInWords:: Message size (min = 1 WORD).

  • nMesg:: Max number of messages.

5.9.2. kStreamSetOwner

Description

Assigns a task owner for the stream queue.

Signature
RK_ERR kStreamSetOwner(RK_STREAM *const kobj, const RK_TASK_HANDLE taskHandle);
Parameters
  • kobj:: Stream Queue address.

  • taskHandle:: Task handle.

5.9.3. kStreamSend

Description

Sends a message to a message queue.

Signature
RK_ERR kStreamSend(RK_STREAM *const kobj, VOID *sendPtr, const RK_TICK timeout);
Parameters
  • kobj:: Queue address.

  • sendPtr:: Message address.

  • timeout:: Suspension time.

5.9.4. kStreamRecv

Description

Receives a message from the queue.

Signature
RK_ERR kStreamRecv(RK_STREAM *const kobj, VOID *recvPtr, const RK_TICK timeout);
Parameters
  • kobj:: Queue address.

  • recvPtr:: Receiving address.

  • timeout:: Suspension time.

5.9.5. kStreamPeek

Description

Receives front message without changing state.

Signature
RK_ERR kStreamPeek(RK_STREAM const * const kobj, VOID *recvPtr);
Parameters
  • kobj:: Stream Queue object.

  • recvPtr:: Receiving pointer.

5.9.6. kStreamJam

Description

Sends message to queue front.

Signature
RK_ERR kStreamJam(RK_STREAM *const kobj, VOID *sendPtr, const RK_TICK timeout);
Parameters
  • kobj:: Stream Queue address.

  • sendPtr:: Message address.

  • timeout:: Suspension time.

5.9.7. kStreamQuery

Description

Retrieves number of messages in a stream queue.

Signature
RK_ERR kStreamQuery(RK_STREAM const * const kobj, UINT *const nMesgPtr);
Parameters
  • kobj:: Stream Queue address.

  • nMesgPtr:: Pointer to store the number of messages

5.10. Most-Recent Message Protocol (MRM)

5.10.1. kMRMInit

Description

Initialises an MRM Control Block.

Signature
RK_ERR kMRMInit(RK_MRM *const kobj, RK_MRM_BUF *const mrmPoolPtr,
                VOID *mesgPoolPtr, ULONG const nBufs, ULONG const dataSizeWords);
Parameters
  • kobj:: Pointer to MRM Control Block.

  • mrmPoolPtr:: Pool of MRM buffers.

  • mesgPoolPtr:: Pool of message buffers.

  • nBufs:: Number of MRM Buffers (= number of messages).

  • dataSizeWords:: Size of message in WORDS.

5.10.2. kMRMReserve

Description

Reserves an MRM Buffer to be written.

Signature
RK_MRM_BUF* kMRMReserve(RK_MRM *const kobj);
Parameters
  • kobj:: Pointer to MRM Control Block.

5.10.3. kMRMPublish

Description

Copies a message into MRM and makes it most recent.

Signature
RK_ERR kMRMPublish(RK_MRM *const kobj, RK_MRM_BUF *const bufPtr, VOID const *dataPtr);
Parameters
  • kobj:: Pointer to MRM Control Block.

  • bufPtr:: Pointer to MRM Buffer.

  • dataPtr:: Pointer to message to publish.

5.10.4. kMRMGet

Description

Receives most recent message within MRM Block.

Signature
RK_MRM_BUF* kMRMGet(RK_MRM *const kobj, VOID *getMesgPtr);
Parameters
  • kobj:: Pointer to MRM Control Block.

  • getMesgPtr:: Pointer where message will be copied.

Returns

Pointer to MRM Buffer (to be used later with kMRMUnget). NULL if error.

5.10.5. kMRMUnget

Description

Releases an MRM Buffer after message consumption.

Signature
RK_ERR kMRMUnget(RK_MRM *const kobj, RK_MRM_BUF *const bufPtr);
Parameters
  • kobj:: Pointer to MRM Control Block.

  • bufPtr:: Pointer to MRM Buffer (from kMRMGet).

5.11. Time, Application Timers and Delays

5.11.1. kTickGet

Description

Gets current system tick count.

Signature
RK_TICK kTickGet(VOID);
Returns

Total ticks since system start-up.

5.11.2. kTickGetMs

Description

Gets amount of time elapsed system start-up in milliseconds.

Signature
RK_TICK kTickGetMs(VOID);
Returns

Total ticks since system start-up in milliseconds.

5.11.3. kTimerInit

Description

Initialises an application timer.

Signature
RK_ERR kTimerInit(RK_TIMER *const kobj, const RK_TICK phase,
                  const RK_TICK countTicks, const RK_TIMER_CALLOUT funPtr,
                  VOID *argsPtr, const BOOL reload);
Parameters
  • kobj:: Timer object address.

  • phase:: Initial phase delay (not applied on reload).

  • countTicks:: Time until expiry in ticks.

  • funPtr:: Callout function when timer expires.

  • argsPtr:: Pointer to callout arguments.

  • reload:: TRUE for reload after timeout; FALSE for one-shot.

5.11.4. kTimerCancel

Description

Cancels an active timer.

Signature
RK_ERR kTimerCancel(RK_TIMER *const kobj);
Parameters
  • kobj:: Timer object address.

5.11.5. kSleep

Description

Puts current task to sleep for a number of ticks.

Signature
RK_ERR kSleep(const RK_TICK ticks);
Parameters
  • ticks:: Number of ticks to sleep.

5.11.6. kSleepUntil

Description

Sleep, compensating any time drifts in-between activations.

Signature
RK_ERR kSleepUntil(RK_TICK const absTicks);
Parameters
  • absTicks:: Absolute interval in ticks.

5.11.7. kBusyWait

Description

Active wait (busy wait) for a number of ticks.

Signature
RK_ERR kBusyWait(RK_TICK const ticks);
Parameters
  • ticks:: Number of ticks to wait.

5.12. Memory Pool (Allocator)

5.12.1. kMemInit

Description

Initialises a memory pool control block.

Signature
RK_ERR kMemInit(RK_MEM *const kobj, VOID *memPoolPtr, ULONG blkSize, const ULONG numBlocks);
Parameters
  • kobj:: Pointer to pool control block.

  • memPoolPtr:: Address of memory pool (array of objects).

  • blkSize:: Size of each block (bytes).

  • numBlocks:: Number of blocks.

5.12.2. kMemAlloc

Description

Allocates memory block from pool.

Signature
VOID *kMemAlloc(RK_MEM *const kobj);
Parameters
  • kobj:: Pointer to block pool.

Returns

Pointer to allocated block, or NULL on failure.

5.12.3. kMemFree

Description

Frees a memory block back to pool.

Signature
RK_ERR kMemFree(RK_MEM *const kobj, VOID *blockPtr);
Parameters
  • kobj:: Pointer to block pool.

  • blockPtr:: Pointer to block to free.

6. Scheduler Determinism

6.1. Preemptive Scheduling

This is a simple test to establish some evidence the scheduler obeys the pre-emption criteria: a higher priority task always pre-empts a lower priority task.

6.1.1. Using Direct Signals

Tasks 1, 2, 3, and 4 are in descending order of priority. If the scheduler is well-behaved, we shall see counters differing by "1."

VOID Task1(VOID* args)
{
    RK_UNUSEARGS
	while(1)
	{
		counter1++;
		kPend(RK_WAIT_FOREVER);
	}
}

VOID Task2(VOID* args)
{
    RK_UNUSEARGS
	while(1)
	{
	    kSignal(task1Handle); /* shall immediately be preempted by task1 */
		counter2++;
		kPend(RK_WAIT_FOREVER);    /* suspends again */
	}
}


VOID Task3(VOID* args)
{
    RK_UNUSEARGS
	while(1)
	{
		kSignal(task2Handle);  /* shall immediately be preempted by task2 */
		counter3++;
		kPend(RK_WAIT_FOREVER); /* suspends again */
	}
}

VOID Task4(VOID* args)
{
    RK_UNUSEARGS
	while(1)
	{
	    /* shall immediately be preempted by task3 */
	    kSignal(task3Handle); /
	    /* only resumes after all tasks are pending again */
	    counter4++;
	}
}

This is the output after some time running:

signaldet
6.1.2. Using Semaphores
RK_SEMA sema1;
RK_SEMA sema2;
RK_SEMA sema3;
RK_SEMA sema4;

VOID kApplicationInit(VOID)
{
	kSemaInit(&sema1, RK_SEMA_COUNT, 0);
	kSemaInit(&sema2, RK_SEMA_COUNT, 0);
	kSemaInit(&sema3, RK_SEMA_COUNT, 0);
	kSemaInit(&sema4, RK_SEMA_COUNT, 0);

}

VOID Task1(VOID* args)
{
    RK_UNUSEARGS
	while (1)
	{
		counter1++;
		kSemaWait(&sema1, RK_WAIT_FOREVER);
	}
}

VOID Task2(VOID* args)
{
    RK_UNUSEARGS
	while (1)
	{
		kSemaSignal(&sema1);
		counter2++;
		kSemaWait(&sema2, RK_WAIT_FOREVER);
	}
}

VOID Task3(VOID* args)
{
    RK_UNUSEARGS
	while (1)
	{
		kSemaSignal(&sema2);
		counter3++;
		kSemaWait(&sema3, RK_WAIT_FOREVER);
	}
}

VOID Task4(VOID* args)
{
    RK_UNUSEARGS
	while (1)
	{

		kSemaSignal(&sema3);
		counter4++;
	}
}
determsema

Here, the tick is running @ 0.5us

6.1.3. Using Mailboxes
/**************************************************
 * Mailboxes are initialised empty.
 *  kMboxInit(&mbox1, NULL);
 *  kMboxInit(&mbox2, NULL);
 *	kMboxInit(&mbox3, NULL);
 *	kMboxInit(&mbox4, NULL);
 *	kMboxInit(&mbox5, NULL);
 *
 * Highest Prio (Task1), Lowest (Task5)
 *
 * Using Mailboxes to pass tokens as signals.
 *
 **************************************************/


VOID Task1( VOID *args)
{
	RK_UNUSEARGS
	UINT *p;
	while (1)
	{
		counter1++;
		kMboxPend(&mbox1, (VOID*) &p, RK_WAIT_FOREVER);
	}
}

VOID Task2( VOID *args)
{
	RK_UNUSEARGS
	UINT mesg = 1;
	UINT *p;

	while (1)
	{
		kMboxPost(&mbox1, &mesg, RK_WAIT_FOREVER);

		counter2++;

		kMboxPend(&mbox2, (VOID*) &p, RK_WAIT_FOREVER);

	}
}
VOID Task3( VOID *args)
{
	RK_UNUSEARGS
	UINT mesg = 1;
	UINT *p;
	while (1)
	{

		kMboxPost(&mbox2, &mesg, RK_WAIT_FOREVER);

		counter3++;

		kMboxPend(&mbox3, (VOID*) &p, RK_WAIT_FOREVER);

	}
}

VOID Task4( VOID *args)
{
	RK_UNUSEARGS
	UINT mesg = 1;
	UINT *p;
	while (1)
	{

		kMboxPost(&mbox3, &mesg, RK_WAIT_FOREVER);

		counter4++;

		kMboxPend(&mbox4, (VOID*) &p, RK_WAIT_FOREVER);

	}
}
VOID Task5( VOID *args)
{
    RK_UNUSEARGS
    UINT mesg=1;
    while(1)
	{

		kMboxPost(&mbox4, &mesg, RK_WAIT_FOREVER);
		counter5++;

	}
}
mboxbench

6.2. Cooperative Scheduling

If we set all tasks at the same priority and every task yields the processor, they will run in a round-robin fashion, one after another. So, every time we pause, chances are we will be "somewhere in the middle" of a round.

If every task increases a counter before yielding, we expect to see a set of counters on a fashion {K, K, K, K-1, K-1, K-1}. A counter will not offset another by more than 1 if the scheduler is deterministic.

/* All tasks have the same priority */
VOID Task1(VOID* args)
{
    RK_UNUSEARGS

	while (1)
	{
		count1 += 1;
		kYield();
	}
}

VOID Task2(VOID* args)
{
    RK_UNUSEARGS
	while (1)
	{
		count2 += 1;
		kYield();
	}
}

VOID Task3(VOID* args)
{
    RK_UNUSEARGS
	while (1)
	{
		count3 += 1;
		kYield();
	}
}

VOID Task4(VOID* args)
{
    RK_UNUSEARGS
	while (1)
	{
		count4 += 1;
		kYield();
	}

}

VOID Task5(VOID* args)
{
    RK_UNUSEARGS
	while (1)
	{
		count5 += 1;
		kYield();
	}

}

The picture below shows the results after ~ 13 million rounds.

determrr

6.3. Memory Allocator Determinism

The memory allocator (if well employed) will never fail; it might take the same amount of time to allocate and free a block. In the test below, three tasks with the same priority are allocating, increasing a counter, and freeing a block of 128 bytes. If the allocator exhibits deterministic behaviour, these counters might differ by at most 1 whenever we pause the device.

#include "application.h"

INT stack1[STACKSIZE];
INT stack2[STACKSIZE];
INT stack3[STACKSIZE];

RK_MEM bufPool;
#define BLOCK_SIZE	128
#define	N_BLOCKS	3
BYTE buf[N_BLOCKS][BLOCK_SIZE];


VOID kApplicationInit(VOID)
{
	kMemInit(&bufPool, buf, BLOCK_SIZE, N_BLOCKS);
}

volatile int counter1, counter2, counter3=0;

VOID Task1(VOID* args)
{
    RK_UNUSEARGS
	while (1)
	{
		BYTE* addr = kMemAlloc(&bufPool);
		kassert(addr!=NULL);
		RK_ERR err = kMemFree(&bufPool, addr);
		kassert(err==0);
		counter1++;
		kYield();
	}
}

VOID Task2(VOID* args)
{
    RK_UNUSEARGS
	while (1)
	{

		BYTE* addr = kMemAlloc(&bufPool);
		kassert(addr!=NULL);
		RK_ERR err = kMemFree(&bufPool, addr);
		kassert(err==0);
		counter2++;
		kYield();
	}
}

VOID Task3(VOID* args)
{
    RK_UNUSEARGS
	while (1)
	{

		BYTE* addr = kMemAlloc(&bufPool);
		kassert(addr!=NULL);
		RK_ERR err = kMemFree(&bufPool, addr);
		kassert(err==0);
		counter3++;
		kYield();
	}

}

Below are the results after ~2.5 million ticks.

determmem

7. Influences

For the sake of intellectual honesty and acknowledgment I will list the systems that have inspired RK0 (still incipient) design:

  • Architecture and features: highly influenced by early Nucleus, ThreadX and uCOS/OS. Which in turn (it seems to me) were influenced by pSOS.

  • Doubly linked list ADT: Draws inspiration from the Linux 2.6.x Dlist implementation

  • Application timers (Callouts): inspired on 4.4BSD’s use of delta lists

  • MRM Protocol: inspired by HARTIK RTOS.


mascott

© 2025 Antonio Giacomelli | All Rights Reserved | www.kernel0.org