This is xnu-12377.1.9. See this file in:
/*
 * Copyright (c) 2024 Apple Inc. All rights reserved.
 *
 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
 *
 * This file contains Original Code and/or Modifications of Original Code
 * as defined in and that are subject to the Apple Public Source License
 * Version 2.0 (the 'License'). You may not use this file except in
 * compliance with the License. The rights granted to you under the License
 * may not be used to create, or enable the creation or redistribution of,
 * unlawful or unlicensed copies of an Apple operating system, or to
 * circumvent, violate, or enable the circumvention or violation of, any
 * terms of an Apple operating system software license agreement.
 *
 * Please obtain a copy of the License at
 * http://www.opensource.apple.com/apsl/ and read it before using this file.
 *
 * The Original Code and all software distributed under the License are
 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
 * Please see the License for the specific language governing rights and
 * limitations under the License.
 *
 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
 */

#include "kpc.h"
#include <kern/mpsc_ring.h>
#include <kern/assert.h>
#include <kern/kalloc.h>
#include <os/atomic_private.h>

/*
 * This ringbuffer has the following constraints:
 *
 * - Multiple-producer: More than one thread will need to write into the buffer
 *   at once.
 * - Single-consumer: Only the single reader under the global lock will consume
 *   and send samples to user space.
 * - Bounded: Writers will drop their data if there's no space left to write.
 * - Known-parallelism: A fixed number of writers.
 *
 * The ringbuffer that stores the kernel samples has a region of allocated
 * memory and offsets that are maintained by the reader and writers. The
 * offsets are 32-bits but typically updated atomically as a single 64-bit
 * value. "Head" refers to an offset used for writing and "tail" is the offset
 * for reading.
 *
 * Writers follow a reserve-commit scheme to ensure that no other writer can
 * interfere with their view into the region and the reader only sees
 * fully-written data.  To get a view that can store their data, the writers do
 * a relaxed load of the offsets and determine how to update the next writer
 * offset.  The next operations happen in a loop:
 *
 * - Add the size of the data to be written to a local copy of the `head`
 *   offset.
 * - Reserve their interest in the write offset by updating a per-CPU "holds"
 *   list with the current `head` value.
 * - Do a compare-exchange on the offsets to attempt with the updated `head`
 *   offset.
 * - If this fails, continue the loop with updated values of the offsets.
 * - Otherwise, exit the loop.
 *
 * The reader will do an atomic load of the offsets with an acquire barrier and
 * remember the writer's offset.  Then it will loop through the per-CPU holds
 * and look for the one with the earliest offset.  That value, combined with
 * the writer's offset, is the furthest it can safely read samples.
 *
 * Here's a typical ringbuffer in use:
 *
 *                                                hold by 2
 *                                                ●       hold by 0
 *                                                │       ●
 *                                                │       │
 *  ┌─────────────────────────────────────────────▼───────▼─────────────┐
 *  │        █████████████████████████████████████░░░░░░░░░░░░          │
 *  └────────▲────────────────────────────────────────────────▲─────────┘
 *  0        │                                                │         capacity
 *           ●                                                ●
 *           tail                                             head
 *
 * The filled region after `tail` has been written and is ready to be read.  The
 * unfilled region has already been read and is available for writing.  There
 * are two concurrent writers (with IDs 2 and 0) with holds outstanding, marked
 * by the shaded region.  Here's a different configuration, after the `head`
 * has wrapped and with the reader "caught up" to the writers:
 *
 *                                                            hold by 3
 *                                                            ●
 *                                                            │
 *  ┌─────────────────────────────────────────────────────────▼─────────┐
 *  │░░░░                                                     ░░░░░░░░░░│
 *  └────▲────────────────────────────────────────────────────▲─────────┘
 *  0    │                                                    │         capacity
 *       ●                                                    ●
 *       head                                                 tail
 *
 * There's one writer active (ID 3), so `tail` can't advance past it. And
 * finally, here's a configuration where there's no more buffer space available
 * for writing:
 *
 *  ┌───────────────────────────────────────────────────────────────────┐
 *  │█████████████████████████████████████████████████████****██████████│
 *  └─────────────────────────────────────────────────────▲───▲─────────┘
 *  0                                                     │   │         capacity
 *                                                        ●   ●
 *                                                     head   tail
 *
 * Almost the entire buffer is waiting to be read.  The `*` between `head` and
 * `tail` is "wasted" space because writers need a contiguous region of memory
 * to write into. In this case, there's not enough of it before running into
 * `tail`.
 */

#define HOLD_EMPTY (~0)

void
mpsc_ring_init(
	struct mpsc_ring *buf,
	uint8_t capacity_pow_2,
	uint8_t writers_max)
{
	/*
	 * Check that this ringbuffer hasn't already been initialized.
	 */
	assert3p(buf->mr_buffer, ==, NULL);
	assert3u(buf->mr_capacity, ==, 0);

	/*
	 * Check for reasonable capacity values.
	 */
	assert3u(capacity_pow_2, <, 30);
	assert3u(capacity_pow_2, >, 0);

	/*
	 * Must be more than one potential writer.
	 */
	assert3u(writers_max, >, 0);

	*buf = (struct mpsc_ring){ 0 };

	/*
	 * Allocate the data buffer to the specified capacity.
	 */
	uint32_t capacity = 1U << capacity_pow_2;
	buf->mr_buffer = kalloc_data_tag(
		capacity,
		Z_WAITOK | Z_ZERO,
		VM_KERN_MEMORY_DIAG);
	if (!buf->mr_buffer) {
		panic(
			"mpsc_ring_init: failed to allocate %u bytes for buffer",
			capacity);
	}
	buf->mr_capacity = capacity;

	/*
	 * Allocate the per-writer holds array.
	 */
	size_t holds_size = writers_max * sizeof(buf->mr_writer_holds[0]);
	buf->mr_writer_holds = kalloc_data_tag(
		holds_size,
		Z_WAITOK | Z_ZERO,
		VM_KERN_MEMORY_DIAG);
	if (!buf->mr_writer_holds) {
		panic(
			"mpsc_ring_init: failed to allocate %zu bytes for holds",
			holds_size);
	}
	buf->mr_writer_count = writers_max;

	/*
	 * Initialize the holds to be empty.
	 */
	for (uint8_t i = 0; i < writers_max; i++) {
		buf->mr_writer_holds[i] = HOLD_EMPTY;
	}
	buf->mr_head_tail = (union mpsc_ring_head_tail){ 0 };
	/*
	 * Publish these updates.
	 */
	os_atomic_thread_fence(release);
}

/**
 * Copy to or from the ringbuffer, taking wrap around at the end into account.
 *
 * @discussion
 * This function does not enforce any bounds checking on the head or tail
 * offsets and is a helper for higher-level interfaces.
 *
 * @param buf
 * The ringbuffer to copy into or out of.
 *
 * @param offset
 * The offset to start the copy operation at.
 *
 * @param data
 * The input or output buffer.
 *
 * @param size
 * The amount of bytes to copy.
 *
 * @param in
 * The direction of the copy. True to treat @link data @/link as a source and
 * copy into the ringbuffer and false to tread @link data @/link as a
 * destination and copy out of the ringbuffer.
 */
OS_ALWAYS_INLINE
static void
_mpsc_ring_copy(
	const struct mpsc_ring *buf,
	uint32_t offset,
	void *data,
	uint32_t size,
	bool in)
{
	/*
	 * Find the offset into the ringbuffer's memory.
	 */
	uint32_t const offset_trunc = offset % buf->mr_capacity;

	/*
	 * Determine how much contiguous space is left in the ringbuffer for a
	 * single memcpy.
	 */
	uint32_t const left_contig = buf->mr_capacity - offset_trunc;
	uint32_t const size_contig = MIN(left_contig, size);
	memcpy(in ? &buf->mr_buffer[offset_trunc] : data,
	    in ? data : &buf->mr_buffer[offset_trunc],
	    size_contig);
	if (size_contig != size) {
		/*
		 * If there's any leftover data uncopied, copy it at the start of the
		 * ringbuffer.
		 */
		uint32_t const size_left = size - size_contig;
		void * const data_left = (char *)data + size_contig;
		memcpy(in ? buf->mr_buffer : data_left,
		    in ? data_left : buf->mr_buffer,
		    size_left);
	}
}

uint32_t
mpsc_ring_write(
	struct mpsc_ring *buf,
	uint8_t writer_id,
	const void *data,
	uint32_t size)
{
	/*
	 * Get an initial guess at where to write.
	 */
	union mpsc_ring_head_tail head_tail = os_atomic_load(
		&buf->mr_head_tail,
		relaxed);
	union mpsc_ring_head_tail new_head_tail = { 0 };

	os_atomic_rmw_loop(
		&buf->mr_head_tail.mrht_head_tail,
		head_tail.mrht_head_tail /* old */,
		new_head_tail.mrht_head_tail /* new */,
		release,
	{
		/*
		 * Check for empty space in the buffer.
		 */
		uint32_t const leftover = head_tail.mrht_head + size - head_tail.mrht_tail;
		if (leftover >= buf->mr_capacity) {
		        /*
		         * Not enough space available for all the data, so give up.
		         */
		        os_atomic_rmw_loop_give_up(goto out);
		}

		/*
		 * Compute a new head offset based on the size being written.
		 */
		new_head_tail = head_tail;
		new_head_tail.mrht_head += size;

		/*
		 * Reserve the start of the space with a hold.
		 */
		os_atomic_store(
			&buf->mr_writer_holds[writer_id],
			head_tail.mrht_head,
			relaxed);
	});

	_mpsc_ring_copy(buf, head_tail.mrht_head, (void *)(uintptr_t)data, size, true);

out:
	/*
	 * Release the hold value so it can synchronize with acquires on the read
	 * side.
	 */
	os_atomic_store(&buf->mr_writer_holds[writer_id], HOLD_EMPTY, release);
	return buf->mr_capacity - (head_tail.mrht_head - head_tail.mrht_tail);
}

mpsc_ring_cursor_t
mpsc_ring_read_start(struct mpsc_ring *buf)
{
	/*
	 * Acquire to ensure that any holds updated are visible.
	 */
	union mpsc_ring_head_tail head_tail = os_atomic_load(&buf->mr_head_tail, acquire);
	for (uint8_t i = 0; i < buf->mr_writer_count; i++) {
		/*
		 * Check for any earlier holds to avoid reading past writes-in-progress.
		 */
		uint32_t hold = os_atomic_load(&buf->mr_writer_holds[i], relaxed);
		if (hold != ~0) {
			head_tail.mrht_head = MIN(head_tail.mrht_head, hold);
		}
	}

	return (mpsc_ring_cursor_t){
		       .mrc_commit_pos = head_tail.mrht_tail,
		       .mrc_pos = head_tail.mrht_tail,
		       .mrc_limit = head_tail.mrht_head,
	};
}

bool
mpsc_ring_cursor_advance(
	const struct mpsc_ring *buf,
	mpsc_ring_cursor_t *cursor,
	void *target,
	uint32_t size)
{
	if (size > cursor->mrc_limit - cursor->mrc_pos) {
		return false;
	}
	_mpsc_ring_copy(buf, cursor->mrc_pos, target, size, false);
	cursor->mrc_pos += size;
	return true;
}

void
mpsc_ring_cursor_commit(
	const struct mpsc_ring * __unused buf,
	mpsc_ring_cursor_t *cursor)
{
	cursor->mrc_commit_pos = cursor->mrc_pos;
}

void
mpsc_ring_read_finish(
	struct mpsc_ring *buf,
	mpsc_ring_cursor_t cursor)
{
	/*
	 * Relaxed, as there's no need to synchronize with any other readers: this
	 * ringbuffer is single-consumer.
	 */
	os_atomic_store(&buf->mr_head_tail.mrht_tail, cursor.mrc_commit_pos, relaxed);
}

void
mpsc_ring_read_cancel(
	struct mpsc_ring * __unused buf,
	mpsc_ring_cursor_t __unused cursor)
{
	/*
	 * Nothing to do; just "consume" the cursor.
	 */
}