/*
* Read-Copy Update mechanism for mutual exclusion
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see .
*
* Copyright (C) IBM Corporation, 2001
*
* Authors: Dipankar Sarma
* Manfred Spraul
*
* Modifications for Xen: Jose Renato Santos
* Copyright (C) Hewlett-Packard, 2006
*
* Based on the original work by Paul McKenney
* and inputs from Rusty Russell, Andrea Arcangeli and Andi Kleen.
* Papers:
* http://www.rdrop.com/users/paulmck/paper/rclockpdcsproof.pdf
* http://lse.sourceforge.net/locking/rclock_OLS.2001.05.01c.sc.pdf (OLS2001)
*
* For detailed explanation of Read-Copy Update mechanism see -
* http://lse.sourceforge.net/locking/rcupdate.html
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
/* Global control variables for rcupdate callback mechanism. */
static struct rcu_ctrlblk {
long cur; /* Current batch number. */
long completed; /* Number of the last completed batch */
int next_pending; /* Is the next batch already waiting? */
spinlock_t lock __cacheline_aligned;
cpumask_t cpumask; /* CPUs that need to switch in order ... */
cpumask_t idle_cpumask; /* ... unless they are already idle */
/* for current batch to proceed. */
} __cacheline_aligned rcu_ctrlblk = {
.cur = -300,
.completed = -300,
.lock = SPIN_LOCK_UNLOCKED,
};
/*
* Per-CPU data for Read-Copy Update.
* nxtlist - new callbacks are added here
* curlist - current batch for which quiescent cycle started if any
*/
struct rcu_data {
/* 1) quiescent state handling : */
long quiescbatch; /* Batch # for grace period */
int qs_pending; /* core waits for quiesc state */
/* 2) batch handling */
long batch; /* Batch # for current RCU batch */
struct rcu_head *nxtlist;
struct rcu_head **nxttail;
long qlen; /* # of queued callbacks */
struct rcu_head *curlist;
struct rcu_head **curtail;
struct rcu_head *donelist;
struct rcu_head **donetail;
long blimit; /* Upper limit on a processed batch */
int cpu;
struct rcu_head barrier;
long last_rs_qlen; /* qlen during the last resched */
/* 3) idle CPUs handling */
struct timer idle_timer;
bool idle_timer_active;
};
/*
* If a CPU with RCU callbacks queued goes idle, when the grace period is
* not finished yet, how can we make sure that the callbacks will eventually
* be executed? In Linux (2.6.21, the first "tickless idle" Linux kernel),
* the periodic timer tick would not be stopped for such CPU. Here in Xen,
* we (may) don't even have a periodic timer tick, so we need to use a
* special purpose timer.
*
* Such timer:
* 1) is armed only when a CPU with an RCU callback(s) queued goes idle
* before the end of the current grace period (_not_ for any CPUs that
* go idle!);
* 2) when it fires, it is only re-armed if the grace period is still
* running;
* 3) it is stopped immediately, if the CPU wakes up from idle and
* resumes 'normal' execution.
*
* About how far in the future the timer should be programmed each time,
* it's hard to tell (guess!!). Since this mimics Linux's periodic timer
* tick, take values used there as an indication. In Linux 2.6.21, tick
* period can be 10ms, 4ms, 3.33ms or 1ms.
*
* By default, we use 10ms, to enable at least some power saving on the
* CPU that is going idle. The user can change this, via a boot time
* parameter, but only up to 100ms.
*/
#define IDLE_TIMER_PERIOD_MAX MILLISECS(100)
#define IDLE_TIMER_PERIOD_DEFAULT MILLISECS(10)
#define IDLE_TIMER_PERIOD_MIN MICROSECS(100)
static s_time_t __read_mostly idle_timer_period;
/*
* Increment and decrement values for the idle timer handler. The algorithm
* works as follows:
* - if the timer actually fires, and it finds out that the grace period isn't
* over yet, we add IDLE_TIMER_PERIOD_INCR to the timer's period;
* - if the timer actually fires and it finds the grace period over, we
* subtract IDLE_TIMER_PERIOD_DECR from the timer's period.
*/
#define IDLE_TIMER_PERIOD_INCR MILLISECS(10)
#define IDLE_TIMER_PERIOD_DECR MICROSECS(100)
static DEFINE_PER_CPU(struct rcu_data, rcu_data);
static int blimit = 10;
static int qhimark = 10000;
static int qlowmark = 100;
static int rsinterval = 1000;
struct rcu_barrier_data {
struct rcu_head head;
atomic_t *cpu_count;
};
static void rcu_barrier_callback(struct rcu_head *head)
{
struct rcu_barrier_data *data = container_of(
head, struct rcu_barrier_data, head);
atomic_inc(data->cpu_count);
}
static int rcu_barrier_action(void *_cpu_count)
{
struct rcu_barrier_data data = { .cpu_count = _cpu_count };
ASSERT(!local_irq_is_enabled());
local_irq_enable();
/*
* When callback is executed, all previously-queued RCU work on this CPU
* is completed. When all CPUs have executed their callback, data.cpu_count
* will have been incremented to include every online CPU.
*/
call_rcu(&data.head, rcu_barrier_callback);
while ( atomic_read(data.cpu_count) != num_online_cpus() )
{
process_pending_softirqs();
cpu_relax();
}
local_irq_disable();
return 0;
}
int rcu_barrier(void)
{
atomic_t cpu_count = ATOMIC_INIT(0);
return stop_machine_run(rcu_barrier_action, &cpu_count, NR_CPUS);
}
/* Is batch a before batch b ? */
static inline int rcu_batch_before(long a, long b)
{
return (a - b) < 0;
}
static void force_quiescent_state(struct rcu_data *rdp,
struct rcu_ctrlblk *rcp)
{
cpumask_t cpumask;
raise_softirq(SCHEDULE_SOFTIRQ);
if (unlikely(rdp->qlen - rdp->last_rs_qlen > rsinterval)) {
rdp->last_rs_qlen = rdp->qlen;
/*
* Don't send IPI to itself. With irqs disabled,
* rdp->cpu is the current cpu.
*/
cpumask_andnot(&cpumask, &rcp->cpumask, cpumask_of(rdp->cpu));
cpumask_raise_softirq(&cpumask, SCHEDULE_SOFTIRQ);
}
}
/**
* call_rcu - Queue an RCU callback for invocation after a grace period.
* @head: structure to be used for queueing the RCU updates.
* @func: actual update function to be invoked after the grace period
*
* The update function will be invoked some time after a full grace
* period elapses, in other words after all currently executing RCU
* read-side critical sections have completed. RCU read-side critical
* sections are delimited by rcu_read_lock() and rcu_read_unlock(),
* and may be nested.
*/
void call_rcu(struct rcu_head *head,
void (*func)(struct rcu_head *rcu))
{
unsigned long flags;
struct rcu_data *rdp;
head->func = func;
head->next = NULL;
local_irq_save(flags);
rdp = &__get_cpu_var(rcu_data);
*rdp->nxttail = head;
rdp->nxttail = &head->next;
if (unlikely(++rdp->qlen > qhimark)) {
rdp->blimit = INT_MAX;
force_quiescent_state(rdp, &rcu_ctrlblk);
}
local_irq_restore(flags);
}
/*
* Invoke the completed RCU callbacks. They are expected to be in
* a per-cpu list.
*/
static void rcu_do_batch(struct rcu_data *rdp)
{
struct rcu_head *next, *list;
int count = 0;
list = rdp->donelist;
while (list) {
next = rdp->donelist = list->next;
list->func(list);
list = next;
rdp->qlen--;
if (++count >= rdp->blimit)
break;
}
if (rdp->blimit == INT_MAX && rdp->qlen <= qlowmark)
rdp->blimit = blimit;
if (!rdp->donelist)
rdp->donetail = &rdp->donelist;
else
raise_softirq(RCU_SOFTIRQ);
}
/*
* Grace period handling:
* The grace period handling consists out of two steps:
* - A new grace period is started.
* This is done by rcu_start_batch. The start is not broadcasted to
* all cpus, they must pick this up by comparing rcp->cur with
* rdp->quiescbatch. All cpus are recorded in the
* rcu_ctrlblk.cpumask bitmap.
* - All cpus must go through a quiescent state.
* Since the start of the grace period is not broadcasted, at least two
* calls to rcu_check_quiescent_state are required:
* The first call just notices that a new grace period is running. The
* following calls check if there was a quiescent state since the beginning
* of the grace period. If so, it updates rcu_ctrlblk.cpumask. If
* the bitmap is empty, then the grace period is completed.
* rcu_check_quiescent_state calls rcu_start_batch(0) to start the next grace
* period (if necessary).
*/
/*
* Register a new batch of callbacks, and start it up if there is currently no
* active batch and the batch to be registered has not already occurred.
* Caller must hold rcu_ctrlblk.lock.
*/
static void rcu_start_batch(struct rcu_ctrlblk *rcp)
{
if (rcp->next_pending &&
rcp->completed == rcp->cur) {
rcp->next_pending = 0;
/*
* next_pending == 0 must be visible in
* __rcu_process_callbacks() before it can see new value of cur.
*/
smp_wmb();
rcp->cur++;
/*
* Make sure the increment of rcp->cur is visible so, even if a
* CPU that is about to go idle, is captured inside rcp->cpumask,
* rcu_pending() will return false, which then means cpu_quiet()
* will be invoked, before the CPU would actually enter idle.
*
* This barrier is paired with the one in rcu_idle_enter().
*/
smp_mb();
cpumask_andnot(&rcp->cpumask, &cpu_online_map, &rcp->idle_cpumask);
}
}
/*
* cpu went through a quiescent state since the beginning of the grace period.
* Clear it from the cpu mask and complete the grace period if it was the last
* cpu. Start another grace period if someone has further entries pending
*/
static void cpu_quiet(int cpu, struct rcu_ctrlblk *rcp)
{
cpumask_clear_cpu(cpu, &rcp->cpumask);
if (cpumask_empty(&rcp->cpumask)) {
/* batch completed ! */
rcp->completed = rcp->cur;
rcu_start_batch(rcp);
}
}
/*
* Check if the cpu has gone through a quiescent state (say context
* switch). If so and if it already hasn't done so in this RCU
* quiescent cycle, then indicate that it has done so.
*/
static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
struct rcu_data *rdp)
{
if (rdp->quiescbatch != rcp->cur) {
/* start new grace period: */
rdp->qs_pending = 1;
rdp->quiescbatch = rcp->cur;
return;
}
/* Grace period already completed for this cpu?
* qs_pending is checked instead of the actual bitmap to avoid
* cacheline trashing.
*/
if (!rdp->qs_pending)
return;
rdp->qs_pending = 0;
spin_lock(&rcp->lock);
/*
* rdp->quiescbatch/rcp->cur and the cpu bitmap can come out of sync
* during cpu startup. Ignore the quiescent state.
*/
if (likely(rdp->quiescbatch == rcp->cur))
cpu_quiet(rdp->cpu, rcp);
spin_unlock(&rcp->lock);
}
/*
* This does the RCU processing work from softirq context.
*/
static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
struct rcu_data *rdp)
{
if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) {
*rdp->donetail = rdp->curlist;
rdp->donetail = rdp->curtail;
rdp->curlist = NULL;
rdp->curtail = &rdp->curlist;
}
local_irq_disable();
if (rdp->nxtlist && !rdp->curlist) {
rdp->curlist = rdp->nxtlist;
rdp->curtail = rdp->nxttail;
rdp->nxtlist = NULL;
rdp->nxttail = &rdp->nxtlist;
local_irq_enable();
/*
* start the next batch of callbacks
*/
/* determine batch number */
rdp->batch = rcp->cur + 1;
/* see the comment and corresponding wmb() in
* the rcu_start_batch()
*/
smp_rmb();
if (!rcp->next_pending) {
/* and start it/schedule start if it's a new batch */
spin_lock(&rcp->lock);
rcp->next_pending = 1;
rcu_start_batch(rcp);
spin_unlock(&rcp->lock);
}
} else {
local_irq_enable();
}
rcu_check_quiescent_state(rcp, rdp);
if (rdp->donelist)
rcu_do_batch(rdp);
}
static void rcu_process_callbacks(void)
{
__rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
}
static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
{
/* This cpu has pending rcu entries and the grace period
* for them has completed.
*/
if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch))
return 1;
/* This cpu has no pending entries, but there are new entries */
if (!rdp->curlist && rdp->nxtlist)
return 1;
/* This cpu has finished callbacks to invoke */
if (rdp->donelist)
return 1;
/* The rcu core waits for a quiescent state from the cpu */
if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
return 1;
/* nothing to do */
return 0;
}
int rcu_pending(int cpu)
{
return __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu));
}
/*
* Check to see if any future RCU-related work will need to be done
* by the current CPU, even if none need be done immediately, returning
* 1 if so. This function is part of the RCU implementation; it is -not-
* an exported member of the RCU API.
*/
int rcu_needs_cpu(int cpu)
{
struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
return (rdp->curlist && !rdp->idle_timer_active) || rcu_pending(cpu);
}
/*
* Timer for making sure the CPU where a callback is queued does
* periodically poke rcu_pedning(), so that it will invoke the callback
* not too late after the end of the grace period.
*/
void rcu_idle_timer_start()
{
struct rcu_data *rdp = &this_cpu(rcu_data);
/*
* Note that we don't check rcu_pending() here. In fact, we don't want
* the timer armed on CPUs that are in the process of quiescing while
* going idle, unless they really are the ones with a queued callback.
*/
if (likely(!rdp->curlist))
return;
set_timer(&rdp->idle_timer, NOW() + idle_timer_period);
rdp->idle_timer_active = true;
}
void rcu_idle_timer_stop()
{
struct rcu_data *rdp = &this_cpu(rcu_data);
if (likely(!rdp->idle_timer_active))
return;
rdp->idle_timer_active = false;
/*
* In general, as the CPU is becoming active again, we don't need the
* idle timer, and so we want to stop it.
*
* However, in case we are here because idle_timer has (just) fired and
* has woken up the CPU, we skip stop_timer() now. In fact, when a CPU
* wakes up from idle, this code always runs before do_softirq() has the
* chance to check and deal with TIMER_SOFTIRQ. And if we stop the timer
* now, the TIMER_SOFTIRQ handler will see it as inactive, and will not
* call rcu_idle_timer_handler().
*
* Therefore, if we see that the timer is expired already, we leave it
* alone. The TIMER_SOFTIRQ handler will then run the timer routine, and
* deactivate it.
*/
if ( !timer_is_expired(&rdp->idle_timer) )
stop_timer(&rdp->idle_timer);
}
static void rcu_idle_timer_handler(void* data)
{
perfc_incr(rcu_idle_timer);
if ( !cpumask_empty(&rcu_ctrlblk.cpumask) )
idle_timer_period = min(idle_timer_period + IDLE_TIMER_PERIOD_INCR,
IDLE_TIMER_PERIOD_MAX);
else
idle_timer_period = max(idle_timer_period - IDLE_TIMER_PERIOD_DECR,
IDLE_TIMER_PERIOD_MIN);
}
void rcu_check_callbacks(int cpu)
{
raise_softirq(RCU_SOFTIRQ);
}
static void rcu_move_batch(struct rcu_data *this_rdp, struct rcu_head *list,
struct rcu_head **tail)
{
local_irq_disable();
*this_rdp->nxttail = list;
if (list)
this_rdp->nxttail = tail;
local_irq_enable();
}
static void rcu_offline_cpu(struct rcu_data *this_rdp,
struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
{
kill_timer(&rdp->idle_timer);
/* If the cpu going offline owns the grace period we can block
* indefinitely waiting for it, so flush it here.
*/
spin_lock(&rcp->lock);
if (rcp->cur != rcp->completed)
cpu_quiet(rdp->cpu, rcp);
spin_unlock(&rcp->lock);
rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail);
rcu_move_batch(this_rdp, rdp->curlist, rdp->curtail);
rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail);
local_irq_disable();
this_rdp->qlen += rdp->qlen;
local_irq_enable();
}
static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
struct rcu_data *rdp)
{
memset(rdp, 0, sizeof(*rdp));
rdp->curtail = &rdp->curlist;
rdp->nxttail = &rdp->nxtlist;
rdp->donetail = &rdp->donelist;
rdp->quiescbatch = rcp->completed;
rdp->qs_pending = 0;
rdp->cpu = cpu;
rdp->blimit = blimit;
init_timer(&rdp->idle_timer, rcu_idle_timer_handler, rdp, cpu);
}
static int cpu_callback(
struct notifier_block *nfb, unsigned long action, void *hcpu)
{
unsigned int cpu = (unsigned long)hcpu;
struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
switch ( action )
{
case CPU_UP_PREPARE:
rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
break;
case CPU_UP_CANCELED:
case CPU_DEAD:
rcu_offline_cpu(&this_cpu(rcu_data), &rcu_ctrlblk, rdp);
break;
default:
break;
}
return NOTIFY_DONE;
}
static struct notifier_block cpu_nfb = {
.notifier_call = cpu_callback
};
void __init rcu_init(void)
{
void *cpu = (void *)(long)smp_processor_id();
static unsigned int __initdata idle_timer_period_ms =
IDLE_TIMER_PERIOD_DEFAULT / MILLISECS(1);
integer_param("rcu-idle-timer-period-ms", idle_timer_period_ms);
/* We don't allow 0, or anything higher than IDLE_TIMER_PERIOD_MAX */
if ( idle_timer_period_ms == 0 ||
idle_timer_period_ms > IDLE_TIMER_PERIOD_MAX / MILLISECS(1) )
{
idle_timer_period_ms = IDLE_TIMER_PERIOD_DEFAULT / MILLISECS(1);
printk("WARNING: rcu-idle-timer-period-ms outside of "
"(0,%"PRI_stime"]. Resetting it to %u.\n",
IDLE_TIMER_PERIOD_MAX / MILLISECS(1), idle_timer_period_ms);
}
idle_timer_period = MILLISECS(idle_timer_period_ms);
cpumask_clear(&rcu_ctrlblk.idle_cpumask);
cpu_callback(&cpu_nfb, CPU_UP_PREPARE, cpu);
register_cpu_notifier(&cpu_nfb);
open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
}
/*
* The CPU is becoming idle, so no more read side critical
* sections, and one more step toward grace period.
*/
void rcu_idle_enter(unsigned int cpu)
{
ASSERT(!cpumask_test_cpu(cpu, &rcu_ctrlblk.idle_cpumask));
cpumask_set_cpu(cpu, &rcu_ctrlblk.idle_cpumask);
/*
* If some other CPU is starting a new grace period, we'll notice that
* by seeing a new value in rcp->cur (different than our quiescbatch).
* That will force us all the way until cpu_quiet(), clearing our bit
* in rcp->cpumask, even in case we managed to get in there.
*
* Se the comment before cpumask_andnot() in rcu_start_batch().
*/
smp_mb();
}
void rcu_idle_exit(unsigned int cpu)
{
ASSERT(cpumask_test_cpu(cpu, &rcu_ctrlblk.idle_cpumask));
cpumask_clear_cpu(cpu, &rcu_ctrlblk.idle_cpumask);
}