/*
Xen Store Daemon interface providing simple tree-like database.
Copyright (C) 2005 Rusty Russell IBM Corporation
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; If not, see .
*/
#define _GNU_SOURCE
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "xenstore.h"
#include
#include
#ifdef USE_PTHREAD
# include
#endif
#ifdef USE_DLSYM
# include
#endif
#ifndef O_CLOEXEC
#define O_CLOEXEC 0
#endif
#ifndef SOCK_CLOEXEC
#define SOCK_CLOEXEC 0
#endif
struct xs_stored_msg {
XEN_TAILQ_ENTRY(struct xs_stored_msg) list;
struct xsd_sockmsg hdr;
char *body;
};
struct xs_handle {
/* Communications channel to xenstore daemon. */
int fd;
bool is_socket; /* is @fd a file or socket? */
Xentoolcore__Active_Handle tc_ah; /* for restrict */
/*
* A read thread which pulls messages off the comms channel and
* signals waiters.
*/
#ifdef USE_PTHREAD
pthread_t read_thr;
int read_thr_exists;
#endif
/*
* A list of fired watch messages, protected by a mutex. Users can
* wait on the conditional variable until a watch is pending.
*/
XEN_TAILQ_HEAD(, struct xs_stored_msg) watch_list;
#ifdef USE_PTHREAD
pthread_mutex_t watch_mutex;
pthread_cond_t watch_condvar;
#endif
/* Clients can select() on this pipe to wait for a watch to fire. */
int watch_pipe[2];
/* Filtering watch event in unwatch function? */
bool unwatch_filter;
/*
* A list of replies. Currently only one will ever be outstanding
* because we serialise requests. The requester can wait on the
* conditional variable for its response.
*/
XEN_TAILQ_HEAD(, struct xs_stored_msg) reply_list;
#ifdef USE_PTHREAD
pthread_mutex_t reply_mutex;
pthread_cond_t reply_condvar;
/* One request at a time. */
pthread_mutex_t request_mutex;
/* Lock discipline:
* Only holder of the request lock may write to h->fd.
* Only holder of the request lock may access read_thr_exists.
* If read_thr_exists==0, only holder of request lock may read h->fd;
* If read_thr_exists==1, only the read thread may read h->fd.
* Only holder of the reply lock may access reply_list.
* Only holder of the watch lock may access watch_list.
* Lock hierarchy:
* The order in which to acquire locks is
* request_mutex
* reply_mutex
* watch_mutex
*/
#endif
};
#ifdef USE_PTHREAD
# define mutex_lock(m) pthread_mutex_lock(m)
# define mutex_unlock(m) pthread_mutex_unlock(m)
# define condvar_signal(c) pthread_cond_signal(c)
# define condvar_wait(c, m) pthread_cond_wait(c, m)
# define cleanup_push(f, a) pthread_cleanup_push((void (*)(void *))(f), (void *)(a))
/*
* Some definitions of pthread_cleanup_pop() are a macro starting with an
* end-brace. GCC then complains if we immediately precede that with a label.
* Hence we insert a dummy statement to appease the compiler in this situation.
*/
# define cleanup_pop(run) ((void)0); pthread_cleanup_pop(run)
# define read_thread_exists(h) ((h)->read_thr_exists)
/* Because pthread_cleanup_p* are not available when USE_PTHREAD is
* disabled, use these macros which convert appropriately. */
# define cleanup_push_heap(p) cleanup_push(free, p)
# define cleanup_pop_heap(run, p) cleanup_pop((run))
static void *read_thread(void *arg);
#else /* USE_PTHREAD */
# define mutex_lock(m) ((void)0)
# define mutex_unlock(m) ((void)0)
# define condvar_signal(c) ((void)0)
# define condvar_wait(c, m) ((void)0)
# define cleanup_push(f, a) ((void)0)
# define cleanup_pop(run) ((void)0)
# define read_thread_exists(h) (0)
# define cleanup_push_heap(p) ((void)0)
# define cleanup_pop_heap(run, p) do { if ((run)) free(p); } while(0)
#endif /* !USE_PTHREAD */
static int read_message(struct xs_handle *h, int nonblocking);
static bool setnonblock(int fd, int nonblock)
{
int flags = fcntl(fd, F_GETFL);
if (flags == -1)
return false;
if (nonblock)
flags |= O_NONBLOCK;
else
flags &= ~O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) == -1)
return false;
return true;
}
static bool set_cloexec(int fd)
{
int flags = fcntl(fd, F_GETFD);
if (flags < 0)
return false;
return fcntl(fd, F_SETFD, flags | FD_CLOEXEC) >= 0;
}
static int pipe_cloexec(int fds[2])
{
#if HAVE_PIPE2
return pipe2(fds, O_CLOEXEC);
#else
if (pipe(fds) < 0)
return -1;
/* Best effort to set CLOEXEC. Racy. */
set_cloexec(fds[0]);
set_cloexec(fds[1]);
return 0;
#endif
}
int xs_fileno(struct xs_handle *h)
{
char c = 0;
mutex_lock(&h->watch_mutex);
if ((h->watch_pipe[0] == -1) && (pipe_cloexec(h->watch_pipe) != -1)) {
/* Kick things off if the watch list is already non-empty. */
if (!XEN_TAILQ_EMPTY(&h->watch_list))
while (write(h->watch_pipe[1], &c, 1) != 1)
continue;
}
mutex_unlock(&h->watch_mutex);
return h->watch_pipe[0];
}
static int get_socket(const char *connect_to)
{
struct sockaddr_un addr;
int sock, saved_errno;
sock = socket(PF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
if (sock < 0)
return -1;
/* Compat for non-SOCK_CLOEXEC environments. Racy. */
if (!SOCK_CLOEXEC && !set_cloexec(sock))
goto error;
addr.sun_family = AF_UNIX;
if (strlen(connect_to) >= sizeof(addr.sun_path)) {
errno = EINVAL;
goto error;
}
strcpy(addr.sun_path, connect_to);
if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) != 0)
goto error;
return sock;
error:
saved_errno = errno;
close(sock);
errno = saved_errno;
return -1;
}
static int get_dev(const char *connect_to)
{
int fd, saved_errno;
fd = open(connect_to, O_RDWR | O_CLOEXEC);
if (fd < 0)
return -1;
/* Compat for non-O_CLOEXEC environments. Racy. */
if (!O_CLOEXEC && !set_cloexec(fd))
goto error;
return fd;
error:
saved_errno = errno;
close(fd);
errno = saved_errno;
return -1;
}
static int all_restrict_cb(Xentoolcore__Active_Handle *ah, domid_t domid)
{
struct xs_handle *h = CONTAINER_OF(ah, *h, tc_ah);
return xentoolcore__restrict_by_dup2_null(h->fd);
}
static struct xs_handle *get_handle(const char *connect_to)
{
struct stat buf;
struct xs_handle *h = NULL;
int saved_errno;
h = malloc(sizeof(*h));
if (h == NULL)
goto err;
memset(h, 0, sizeof(*h));
h->fd = -1;
h->tc_ah.restrict_callback = all_restrict_cb;
xentoolcore__register_active_handle(&h->tc_ah);
if (stat(connect_to, &buf) != 0)
goto err;
h->is_socket = S_ISSOCK(buf.st_mode);
if (h->is_socket)
h->fd = get_socket(connect_to);
else
h->fd = get_dev(connect_to);
if (h->fd == -1)
goto err;
XEN_TAILQ_INIT(&h->reply_list);
XEN_TAILQ_INIT(&h->watch_list);
/* Watch pipe is allocated on demand in xs_fileno(). */
h->watch_pipe[0] = h->watch_pipe[1] = -1;
h->unwatch_filter = false;
#ifdef USE_PTHREAD
pthread_mutex_init(&h->watch_mutex, NULL);
pthread_cond_init(&h->watch_condvar, NULL);
pthread_mutex_init(&h->reply_mutex, NULL);
pthread_cond_init(&h->reply_condvar, NULL);
pthread_mutex_init(&h->request_mutex, NULL);
#endif
return h;
err:
saved_errno = errno;
if (h) {
xentoolcore__deregister_active_handle(&h->tc_ah);
if (h->fd >= 0)
close(h->fd);
}
free(h);
errno = saved_errno;
return NULL;
}
struct xs_handle *xs_daemon_open(void)
{
return xs_open(0);
}
struct xs_handle *xs_daemon_open_readonly(void)
{
return xs_open(0);
}
struct xs_handle *xs_domain_open(void)
{
return xs_open(0);
}
static const char *xs_domain_dev(void)
{
char *s = getenv("XENSTORED_PATH");
if (s)
return s;
#if defined(__RUMPUSER_XEN__) || defined(__RUMPRUN__)
return "/dev/xen/xenbus";
#elif defined(__linux__)
if (access("/dev/xen/xenbus", F_OK) == 0)
return "/dev/xen/xenbus";
return "/proc/xen/xenbus";
#elif defined(__NetBSD__)
return "/kern/xen/xenbus";
#elif defined(__FreeBSD__)
return "/dev/xen/xenstore";
#else
return "/dev/xen/xenbus";
#endif
}
struct xs_handle *xs_open(unsigned long flags)
{
struct xs_handle *xsh = NULL;
xsh = get_handle(xs_daemon_socket());
if (!xsh)
xsh = get_handle(xs_domain_dev());
if (xsh && (flags & XS_UNWATCH_FILTER))
xsh->unwatch_filter = true;
return xsh;
}
static void close_free_msgs(struct xs_handle *h)
{
struct xs_stored_msg *msg, *tmsg;
XEN_TAILQ_FOREACH_SAFE(msg, &h->reply_list, list, tmsg) {
free(msg->body);
free(msg);
}
XEN_TAILQ_FOREACH_SAFE(msg, &h->watch_list, list, tmsg) {
free(msg->body);
free(msg);
}
}
static void close_fds_free(struct xs_handle *h)
{
if (h->watch_pipe[0] != -1) {
close(h->watch_pipe[0]);
close(h->watch_pipe[1]);
}
xentoolcore__deregister_active_handle(&h->tc_ah);
close(h->fd);
free(h);
}
void xs_daemon_destroy_postfork(struct xs_handle *h)
{
close_free_msgs(h);
close_fds_free(h);
}
void xs_daemon_close(struct xs_handle *h)
{
#ifdef USE_PTHREAD
if (h->read_thr_exists) {
pthread_cancel(h->read_thr);
pthread_join(h->read_thr, NULL);
}
#endif
mutex_lock(&h->request_mutex);
mutex_lock(&h->reply_mutex);
mutex_lock(&h->watch_mutex);
close_free_msgs(h);
mutex_unlock(&h->request_mutex);
mutex_unlock(&h->reply_mutex);
mutex_unlock(&h->watch_mutex);
close_fds_free(h);
}
void xs_close(struct xs_handle *xsh)
{
if (xsh)
xs_daemon_close(xsh);
}
static bool read_all(int fd, void *data, unsigned int len, int nonblocking)
/* With nonblocking, either reads either everything requested,
* or nothing. */
{
if (!len)
return true;
if (nonblocking && !setnonblock(fd, 1))
return false;
while (len) {
int done;
done = read(fd, data, len);
if (done < 0) {
if (errno == EINTR)
continue;
goto out_false;
}
if (done == 0) {
/* It closed fd on us? EBADF is appropriate. */
errno = EBADF;
goto out_false;
}
data += done;
len -= done;
if (nonblocking) {
nonblocking = 0;
if (!setnonblock(fd, 0))
goto out_false;
}
}
return true;
out_false:
if (nonblocking)
setnonblock(fd, 0);
return false;
}
/* Simple routine for writing to sockets, etc. */
bool xs_write_all(int fd, const void *data, unsigned int len)
{
while (len) {
int done;
done = write(fd, data, len);
if (done < 0 && errno == EINTR)
continue;
if (done <= 0)
return false;
data += done;
len -= done;
}
return true;
}
static int get_error(const char *errorstring)
{
unsigned int i;
for (i = 0; strcmp(errorstring, xsd_errors[i].errstring); i++)
if (i == ARRAY_SIZE(xsd_errors) - 1)
return EINVAL;
return xsd_errors[i].errnum;
}
/* Adds extra nul terminator, because we generally (always?) hold strings. */
static void *read_reply(
struct xs_handle *h, enum xsd_sockmsg_type *type, unsigned int *len)
{
struct xs_stored_msg *msg;
char *body;
int read_from_thread;
read_from_thread = read_thread_exists(h);
/* Read from comms channel ourselves if there is no reader thread. */
if (!read_from_thread && (read_message(h, 0) == -1))
return NULL;
mutex_lock(&h->reply_mutex);
#ifdef USE_PTHREAD
while (XEN_TAILQ_EMPTY(&h->reply_list) && read_from_thread && h->fd != -1)
condvar_wait(&h->reply_condvar, &h->reply_mutex);
#endif
if (XEN_TAILQ_EMPTY(&h->reply_list)) {
mutex_unlock(&h->reply_mutex);
errno = EINVAL;
return NULL;
}
msg = XEN_TAILQ_FIRST(&h->reply_list);
XEN_TAILQ_REMOVE(&h->reply_list, msg, list);
assert(XEN_TAILQ_EMPTY(&h->reply_list));
mutex_unlock(&h->reply_mutex);
*type = msg->hdr.type;
if (len)
*len = msg->hdr.len;
body = msg->body;
free(msg);
return body;
}
/*
* Update an iov/nr pair after an incomplete writev()/sendmsg().
*
* Awkwardly, nr has different widths and signs between writev() and
* sendmsg(), so we take it and return it by value, rather than by pointer.
*/
static size_t update_iov(struct iovec **p_iov, size_t nr, size_t res)
{
struct iovec *iov = *p_iov;
/* Skip fully complete elements, including empty elements. */
while (nr && res >= iov->iov_len) {
res -= iov->iov_len;
nr--;
iov++;
}
/* Partial element, adjust base/len. */
if (res) {
iov->iov_len -= res;
iov->iov_base += res;
}
*p_iov = iov;
return nr;
}
/*
* Wrapper around sendmsg() to resubmit on EINTR or short write. Returns
* @true if all data was transmitted, or @false with errno for an error.
* Note: May alter @iov in place on resubmit.
*/
static bool sendmsg_exact(int fd, struct iovec *iov, unsigned int nr)
{
struct msghdr hdr = {
.msg_iov = iov,
.msg_iovlen = nr,
};
while (hdr.msg_iovlen) {
ssize_t res = sendmsg(fd, &hdr, MSG_NOSIGNAL);
if (res < 0 && errno == EINTR)
continue;
if (res <= 0)
return false;
hdr.msg_iovlen = update_iov(&hdr.msg_iov, hdr.msg_iovlen, res);
}
return true;
}
/*
* Wrapper around sendmsg() to resubmit on EINTR or short write. Returns
* @true if all data was transmitted, or @false with errno for an error.
* Note: May alter @iov in place on resubmit.
*/
static bool writev_exact(int fd, struct iovec *iov, unsigned int nr)
{
while (nr) {
ssize_t res = writev(fd, iov, nr);
if (res < 0 && errno == EINTR)
continue;
if (res <= 0)
return false;
nr = update_iov(&iov, nr, res);
}
return true;
}
static bool write_request(struct xs_handle *h, struct iovec *iov, unsigned int nr)
{
if (h->is_socket)
return sendmsg_exact(h->fd, iov, nr);
else
return writev_exact(h->fd, iov, nr);
}
/*
* Send message to xenstore, get malloc'ed reply. NULL and set errno on error.
*
* @iovec describes the entire outgoing message, starting with the xsd_sockmsg
* header. xs_talkv() calculates the outgoing message length, updating
* xsd_sockmsg in element 0. xs_talkv() might edit the iovec structure in
* place (e.g. following short writes).
*/
static void *xs_talkv(struct xs_handle *h,
struct iovec *iovec,
unsigned int num_vecs,
unsigned int *len)
{
struct xsd_sockmsg *msg = iovec[0].iov_base;
enum xsd_sockmsg_type reply_type;
void *ret = NULL;
int saved_errno;
unsigned int i, msg_len;
/* Element 0 must be xsd_sockmsg */
assert(num_vecs >= 1);
assert(iovec[0].iov_len == sizeof(*msg));
/* Calculate the payload length by summing iovec elements */
for (i = 1, msg_len = 0; i < num_vecs; i++) {
if ((iovec[i].iov_len > XENSTORE_PAYLOAD_MAX) ||
((msg_len += iovec[i].iov_len) > XENSTORE_PAYLOAD_MAX)) {
errno = E2BIG;
return NULL;
}
}
msg->len = msg_len;
mutex_lock(&h->request_mutex);
if (!write_request(h, iovec, num_vecs))
goto fail;
ret = read_reply(h, &reply_type, len);
if (!ret)
goto fail;
mutex_unlock(&h->request_mutex);
if (reply_type == XS_ERROR) {
saved_errno = get_error(ret);
free(ret);
errno = saved_errno;
return NULL;
}
if (reply_type != msg->type) {
free(ret);
saved_errno = EBADF;
goto close_fd;
}
return ret;
fail:
/* We're in a bad state, so close fd. */
saved_errno = errno;
mutex_unlock(&h->request_mutex);
close_fd:
close(h->fd);
h->fd = -1;
errno = saved_errno;
return NULL;
}
/* free(), but don't change errno. */
static void free_no_errno(void *p)
{
int saved_errno = errno;
free(p);
errno = saved_errno;
}
/* Simplified version of xs_talkv: single message. */
static void *xs_single(struct xs_handle *h, xs_transaction_t t,
enum xsd_sockmsg_type type,
const char *string,
unsigned int *len)
{
struct xsd_sockmsg msg = { .type = type, .tx_id = t };
struct iovec iov[2];
iov[0].iov_base = &msg;
iov[0].iov_len = sizeof(msg);
iov[1].iov_base = (void *)string;
iov[1].iov_len = strlen(string) + 1;
return xs_talkv(h, iov, ARRAY_SIZE(iov), len);
}
static bool xs_bool(char *reply)
{
if (!reply)
return false;
free(reply);
return true;
}
static char **xs_directory_common(char *strings, unsigned int len,
unsigned int *num)
{
char *p, **ret;
/* Count the strings. */
*num = xenstore_count_strings(strings, len);
/* Transfer to one big alloc for easy freeing. */
ret = malloc(*num * sizeof(char *) + len);
if (!ret) {
free_no_errno(strings);
return NULL;
}
memcpy(&ret[*num], strings, len);
free_no_errno(strings);
strings = (char *)&ret[*num];
for (p = strings, *num = 0; p < strings + len; p += strlen(p) + 1)
ret[(*num)++] = p;
return ret;
}
static char **xs_directory_part(struct xs_handle *h, xs_transaction_t t,
const char *path, unsigned int *num)
{
struct xsd_sockmsg msg = { .type = XS_DIRECTORY_PART, .tx_id = t };
unsigned int off, result_len;
char gen[24], offstr[8];
struct iovec iov[3];
char *result = NULL, *strings = NULL;
memset(gen, 0, sizeof(gen));
for (off = 0;;) {
snprintf(offstr, sizeof(offstr), "%u", off);
iov[0].iov_base = &msg;
iov[0].iov_len = sizeof(msg);
iov[1].iov_base = (void *)path;
iov[1].iov_len = strlen(path) + 1;
iov[2].iov_base = (void *)offstr;
iov[2].iov_len = strlen(offstr) + 1;
result = xs_talkv(h, iov, ARRAY_SIZE(iov), &result_len);
/* If XS_DIRECTORY_PART isn't supported return E2BIG. */
if (!result) {
if (errno == ENOSYS)
errno = E2BIG;
return NULL;
}
if (off) {
if (strcmp(gen, result)) {
free(result);
free(strings);
strings = NULL;
off = 0;
continue;
}
} else
strncpy(gen, result, sizeof(gen) - 1);
result_len -= strlen(result) + 1;
strings = realloc(strings, off + result_len);
memcpy(strings + off, result + strlen(result) + 1, result_len);
free(result);
off += result_len;
if (off <= 1 || strings[off - 2] == 0)
break;
}
if (off > 1)
off--;
return xs_directory_common(strings, off, num);
}
char **xs_directory(struct xs_handle *h, xs_transaction_t t,
const char *path, unsigned int *num)
{
char *strings;
unsigned int len;
strings = xs_single(h, t, XS_DIRECTORY, path, &len);
if (!strings) {
if (errno != E2BIG)
return NULL;
return xs_directory_part(h, t, path, num);
}
return xs_directory_common(strings, len, num);
}
/* Get the value of a single file, nul terminated.
* Returns a malloced value: call free() on it after use.
* len indicates length in bytes, not including the nul.
* Returns NULL on failure.
*/
void *xs_read(struct xs_handle *h, xs_transaction_t t,
const char *path, unsigned int *len)
{
return xs_single(h, t, XS_READ, path, len);
}
/* Write the value of a single file.
* Returns false on failure.
*/
bool xs_write(struct xs_handle *h, xs_transaction_t t,
const char *path, const void *data, unsigned int len)
{
struct xsd_sockmsg msg = { .type = XS_WRITE, .tx_id = t };
struct iovec iov[3];
iov[0].iov_base = &msg;
iov[0].iov_len = sizeof(msg);
iov[1].iov_base = (void *)path;
iov[1].iov_len = strlen(path) + 1;
iov[2].iov_base = (void *)data;
iov[2].iov_len = len;
return xs_bool(xs_talkv(h, iov, ARRAY_SIZE(iov), NULL));
}
/* Create a new directory.
* Returns false on failure, or success if it already exists.
*/
bool xs_mkdir(struct xs_handle *h, xs_transaction_t t,
const char *path)
{
return xs_bool(xs_single(h, t, XS_MKDIR, path, NULL));
}
/* Destroy a file or directory (directories must be empty).
* Returns false on failure, or success if it doesn't exist.
*/
bool xs_rm(struct xs_handle *h, xs_transaction_t t,
const char *path)
{
return xs_bool(xs_single(h, t, XS_RM, path, NULL));
}
/* Get permissions of node (first element is owner).
* Returns malloced array, or NULL: call free() after use.
*/
struct xs_permissions *xs_get_permissions(struct xs_handle *h,
xs_transaction_t t,
const char *path, unsigned int *num)
{
char *strings;
unsigned int len;
struct xs_permissions *ret;
strings = xs_single(h, t, XS_GET_PERMS, path, &len);
if (!strings)
return NULL;
/* Count the strings: each one perms then domid. */
*num = xenstore_count_strings(strings, len);
/* Transfer to one big alloc for easy freeing. */
ret = malloc(*num * sizeof(struct xs_permissions));
if (!ret) {
free_no_errno(strings);
return NULL;
}
if (!xenstore_strings_to_perms(ret, *num, strings)) {
free_no_errno(ret);
ret = NULL;
}
free(strings);
return ret;
}
/* Set permissions of node (must be owner).
* Returns false on failure.
*/
bool xs_set_permissions(struct xs_handle *h,
xs_transaction_t t,
const char *path,
struct xs_permissions *perms,
unsigned int num_perms)
{
struct xsd_sockmsg msg = { .type = XS_SET_PERMS, .tx_id = t };
unsigned int i;
struct iovec iov[2 + num_perms];
iov[0].iov_base = &msg;
iov[0].iov_len = sizeof(msg);
iov[1].iov_base = (void *)path;
iov[1].iov_len = strlen(path) + 1;
for (i = 0; i < num_perms; i++) {
char buffer[MAX_STRLEN(unsigned int)+1];
if (!xenstore_perm_to_string(&perms[i], buffer, sizeof(buffer)))
goto unwind;
iov[i + 2].iov_base = strdup(buffer);
iov[i + 2].iov_len = strlen(buffer) + 1;
if (!iov[i+1].iov_base)
goto unwind;
}
if (!xs_bool(xs_talkv(h, iov, ARRAY_SIZE(iov), NULL)))
goto unwind;
for (i = 0; i < num_perms; i++)
free(iov[i + 2].iov_base);
return true;
unwind:
num_perms = i;
for (i = 0; i < num_perms; i++)
free_no_errno(iov[i + 2].iov_base);
return false;
}
/* Always return false a functionality has been removed in Xen 4.9 */
bool xs_restrict(struct xs_handle *h, unsigned domid)
{
return false;
}
/* Watch a node for changes (poll on fd to detect, or call read_watch()).
* When the node (or any child) changes, fd will become readable.
* Token is returned when watch is read, to allow matching.
* Returns false on failure.
*/
bool xs_watch(struct xs_handle *h, const char *path, const char *token)
{
struct xsd_sockmsg msg = { .type = XS_WATCH };
struct iovec iov[3];
#ifdef USE_PTHREAD
#define DEFAULT_THREAD_STACKSIZE (16 * 1024)
/* NetBSD doesn't have PTHREAD_STACK_MIN. */
#ifndef PTHREAD_STACK_MIN
# define PTHREAD_STACK_MIN 0
#endif
#define READ_THREAD_STACKSIZE \
((DEFAULT_THREAD_STACKSIZE < PTHREAD_STACK_MIN) ? \
PTHREAD_STACK_MIN : DEFAULT_THREAD_STACKSIZE)
/* We dynamically create a reader thread on demand. */
mutex_lock(&h->request_mutex);
if (!h->read_thr_exists) {
sigset_t set, old_set;
pthread_attr_t attr;
static size_t stack_size;
#ifdef USE_DLSYM
size_t (*getsz)(pthread_attr_t *attr);
#endif
if (pthread_attr_init(&attr) != 0) {
mutex_unlock(&h->request_mutex);
return false;
}
if (!stack_size) {
#ifdef USE_DLSYM
getsz = dlsym(RTLD_DEFAULT, "__pthread_get_minstack");
if (getsz)
stack_size = getsz(&attr);
#endif
if (stack_size < READ_THREAD_STACKSIZE)
stack_size = READ_THREAD_STACKSIZE;
}
if (pthread_attr_setstacksize(&attr, stack_size) != 0) {
pthread_attr_destroy(&attr);
mutex_unlock(&h->request_mutex);
return false;
}
sigfillset(&set);
pthread_sigmask(SIG_SETMASK, &set, &old_set);
if (pthread_create(&h->read_thr, &attr, read_thread, h) != 0) {
pthread_sigmask(SIG_SETMASK, &old_set, NULL);
pthread_attr_destroy(&attr);
mutex_unlock(&h->request_mutex);
return false;
}
h->read_thr_exists = 1;
pthread_sigmask(SIG_SETMASK, &old_set, NULL);
pthread_attr_destroy(&attr);
}
mutex_unlock(&h->request_mutex);
#endif
iov[0].iov_base = &msg;
iov[0].iov_len = sizeof(msg);
iov[1].iov_base = (void *)path;
iov[1].iov_len = strlen(path) + 1;
iov[2].iov_base = (void *)token;
iov[2].iov_len = strlen(token) + 1;
return xs_bool(xs_talkv(h, iov, ARRAY_SIZE(iov), NULL));
}
/* Clear the pipe token if there are no more pending watchs.
* We suppose the watch_mutex is already taken.
*/
static void xs_maybe_clear_watch_pipe(struct xs_handle *h)
{
char c;
if (XEN_TAILQ_EMPTY(&h->watch_list) && (h->watch_pipe[0] != -1))
while (read(h->watch_pipe[0], &c, 1) != 1)
continue;
}
/* Find out what node change was on (will block if nothing pending).
* Returns array of two pointers: path and token, or NULL.
* Call free() after use.
*/
static char **read_watch_internal(struct xs_handle *h, unsigned int *num,
int nonblocking)
{
struct xs_stored_msg *msg;
char **ret, *strings;
unsigned int num_strings, i;
mutex_lock(&h->watch_mutex);
#ifdef USE_PTHREAD
/* Wait on the condition variable for a watch to fire.
* If the reader thread doesn't exist yet, then that's because
* we haven't called xs_watch. Presumably the application
* will do so later; in the meantime we just block.
*/
while (XEN_TAILQ_EMPTY(&h->watch_list) && h->fd != -1) {
if (nonblocking) {
mutex_unlock(&h->watch_mutex);
errno = EAGAIN;
return 0;
}
condvar_wait(&h->watch_condvar, &h->watch_mutex);
}
#else /* !defined(USE_PTHREAD) */
/* Read from comms channel ourselves if there are no threads
* and therefore no reader thread. */
assert(!read_thread_exists(h)); /* not threadsafe but worth a check */
if ((read_message(h, nonblocking) == -1))
return NULL;
#endif /* !defined(USE_PTHREAD) */
if (XEN_TAILQ_EMPTY(&h->watch_list)) {
mutex_unlock(&h->watch_mutex);
errno = EINVAL;
return NULL;
}
msg = XEN_TAILQ_FIRST(&h->watch_list);
XEN_TAILQ_REMOVE(&h->watch_list, msg, list);
xs_maybe_clear_watch_pipe(h);
mutex_unlock(&h->watch_mutex);
assert(msg->hdr.type == XS_WATCH_EVENT);
strings = msg->body;
num_strings = xenstore_count_strings(strings, msg->hdr.len);
ret = malloc(sizeof(char*) * num_strings + msg->hdr.len);
if (!ret) {
free_no_errno(strings);
free_no_errno(msg);
return NULL;
}
ret[0] = (char *)(ret + num_strings);
memcpy(ret[0], strings, msg->hdr.len);
free(strings);
free(msg);
for (i = 1; i < num_strings; i++)
ret[i] = ret[i - 1] + strlen(ret[i - 1]) + 1;
*num = num_strings;
return ret;
}
char **xs_check_watch(struct xs_handle *h)
{
unsigned int num;
char **ret;
ret = read_watch_internal(h, &num, 1);
if (ret) assert(num >= 2);
return ret;
}
/* Find out what node change was on (will block if nothing pending).
* Returns array of two pointers: path and token, or NULL.
* Call free() after use.
*/
char **xs_read_watch(struct xs_handle *h, unsigned int *num)
{
return read_watch_internal(h, num, 0);
}
/* Remove a watch on a node.
* Returns false on failure (no watch on that node).
*/
bool xs_unwatch(struct xs_handle *h, const char *path, const char *token)
{
struct xsd_sockmsg sockmsg = { .type = XS_UNWATCH };
struct iovec iov[3];
struct xs_stored_msg *msg, *tmsg;
bool res;
char *s, *p;
unsigned int i;
char *l_token, *l_path;
iov[0].iov_base = &sockmsg;
iov[0].iov_len = sizeof(sockmsg);
iov[1].iov_base = (char *)path;
iov[1].iov_len = strlen(path) + 1;
iov[2].iov_base = (char *)token;
iov[2].iov_len = strlen(token) + 1;
res = xs_bool(xs_talkv(h, iov, ARRAY_SIZE(iov), NULL));
if (!h->unwatch_filter) /* Don't filter the watch list */
return res;
/* Filter the watch list to remove potential message */
mutex_lock(&h->watch_mutex);
if (XEN_TAILQ_EMPTY(&h->watch_list)) {
mutex_unlock(&h->watch_mutex);
return res;
}
XEN_TAILQ_FOREACH_SAFE(msg, &h->watch_list, list, tmsg) {
assert(msg->hdr.type == XS_WATCH_EVENT);
s = msg->body;
l_token = NULL;
l_path = NULL;
for (p = s, i = 0; p < msg->body + msg->hdr.len; p++) {
if (*p == '\0')
{
if (i == XS_WATCH_TOKEN)
l_token = s;
else if (i == XS_WATCH_PATH)
l_path = s;
i++;
s = p + 1;
}
}
if (l_token && !strcmp(token, l_token) &&
l_path && xs_path_is_subpath(path, l_path)) {
XEN_TAILQ_REMOVE(&h->watch_list, msg, list);
free(msg);
}
}
xs_maybe_clear_watch_pipe(h);
mutex_unlock(&h->watch_mutex);
return res;
}
/* Start a transaction: changes by others will not be seen during this
* transaction, and changes will not be visible to others until end.
* Returns XBT_NULL on failure.
*/
xs_transaction_t xs_transaction_start(struct xs_handle *h)
{
char *id_str;
xs_transaction_t id;
id_str = xs_single(h, XBT_NULL, XS_TRANSACTION_START, "", NULL);
if (id_str == NULL)
return XBT_NULL;
id = strtoul(id_str, NULL, 0);
free(id_str);
return id;
}
/* End a transaction.
* If abandon is true, transaction is discarded instead of committed.
* Returns false on failure, which indicates an error: transactions will
* not fail spuriously.
*/
bool xs_transaction_end(struct xs_handle *h, xs_transaction_t t,
bool abort)
{
char abortstr[2];
if (abort)
strcpy(abortstr, "F");
else
strcpy(abortstr, "T");
return xs_bool(xs_single(h, t, XS_TRANSACTION_END, abortstr, NULL));
}
/* Introduce a new domain.
* This tells the store daemon about a shared memory page and event channel
* associated with a domain: the domain uses these to communicate.
*/
bool xs_introduce_domain(struct xs_handle *h,
unsigned int domid, unsigned long mfn,
unsigned int eventchn)
{
struct xsd_sockmsg msg = { .type = XS_INTRODUCE };
char domid_str[MAX_STRLEN(domid)];
char mfn_str[MAX_STRLEN(mfn)];
char eventchn_str[MAX_STRLEN(eventchn)];
struct iovec iov[4];
snprintf(domid_str, sizeof(domid_str), "%u", domid);
snprintf(mfn_str, sizeof(mfn_str), "%lu", mfn);
snprintf(eventchn_str, sizeof(eventchn_str), "%u", eventchn);
iov[0].iov_base = &msg;
iov[0].iov_len = sizeof(msg);
iov[1].iov_base = domid_str;
iov[1].iov_len = strlen(domid_str) + 1;
iov[2].iov_base = mfn_str;
iov[2].iov_len = strlen(mfn_str) + 1;
iov[3].iov_base = eventchn_str;
iov[3].iov_len = strlen(eventchn_str) + 1;
return xs_bool(xs_talkv(h, iov, ARRAY_SIZE(iov), NULL));
}
bool xs_set_target(struct xs_handle *h,
unsigned int domid, unsigned int target)
{
struct xsd_sockmsg msg = { .type = XS_SET_TARGET };
char domid_str[MAX_STRLEN(domid)];
char target_str[MAX_STRLEN(target)];
struct iovec iov[3];
snprintf(domid_str, sizeof(domid_str), "%u", domid);
snprintf(target_str, sizeof(target_str), "%u", target);
iov[0].iov_base = &msg;
iov[0].iov_len = sizeof(msg);
iov[1].iov_base = domid_str;
iov[1].iov_len = strlen(domid_str) + 1;
iov[2].iov_base = target_str;
iov[2].iov_len = strlen(target_str) + 1;
return xs_bool(xs_talkv(h, iov, ARRAY_SIZE(iov), NULL));
}
static void * single_with_domid(struct xs_handle *h,
enum xsd_sockmsg_type type,
unsigned int domid)
{
char domid_str[MAX_STRLEN(domid)];
snprintf(domid_str, sizeof(domid_str), "%u", domid);
return xs_single(h, XBT_NULL, type, domid_str, NULL);
}
bool xs_release_domain(struct xs_handle *h, unsigned int domid)
{
return xs_bool(single_with_domid(h, XS_RELEASE, domid));
}
/* clear the shutdown bit for the given domain */
bool xs_resume_domain(struct xs_handle *h, unsigned int domid)
{
return xs_bool(single_with_domid(h, XS_RESUME, domid));
}
char *xs_get_domain_path(struct xs_handle *h, unsigned int domid)
{
return single_with_domid(h, XS_GET_DOMAIN_PATH, domid);
}
bool xs_path_is_subpath(const char *parent, const char *child)
{
size_t childlen = strlen(child);
size_t parentlen = strlen(parent);
if (childlen < parentlen)
return false;
if (memcmp(child, parent, parentlen))
return false;
if (childlen > parentlen && child[parentlen] != '/')
return false;
return true;
}
bool xs_is_domain_introduced(struct xs_handle *h, unsigned int domid)
{
char *domain = single_with_domid(h, XS_IS_DOMAIN_INTRODUCED, domid);
bool rc = false;
if (!domain)
return rc;
rc = strcmp("F", domain) != 0;
free(domain);
return rc;
}
int xs_suspend_evtchn_port(int domid)
{
char path[128];
char *portstr;
int port;
unsigned int plen;
struct xs_handle *xs;
xs = xs_daemon_open();
if (!xs)
return -1;
sprintf(path, "/local/domain/%d/device/suspend/event-channel", domid);
portstr = xs_read(xs, XBT_NULL, path, &plen);
xs_daemon_close(xs);
if (!portstr || !plen) {
port = -1;
goto out;
}
port = atoi(portstr);
out:
free(portstr);
return port;
}
static bool xs_uint(char *reply, unsigned int *uintval)
{
if (!reply)
return false;
*uintval = strtoul(reply, NULL, 10);
free(reply);
return true;
}
bool xs_get_features_supported(struct xs_handle *h, unsigned int *features)
{
struct xsd_sockmsg msg = { .type = XS_GET_FEATURE };
struct iovec iov[1];
iov[0].iov_base = &msg;
iov[0].iov_len = sizeof(msg);
return xs_uint(xs_talkv(h, iov, ARRAY_SIZE(iov), NULL), features);
}
bool xs_get_features_domain(struct xs_handle *h, unsigned int domid,
unsigned int *features)
{
return xs_uint(single_with_domid(h, XS_GET_FEATURE, domid), features);
}
bool xs_set_features_domain(struct xs_handle *h, unsigned int domid,
unsigned int features)
{
struct xsd_sockmsg msg = { .type = XS_SET_FEATURE };
char domid_str[MAX_STRLEN(domid)];
char feat_str[MAX_STRLEN(features)];
struct iovec iov[3];
snprintf(domid_str, sizeof(domid_str), "%u", domid);
snprintf(feat_str, sizeof(feat_str), "%u", features);
iov[0].iov_base = &msg;
iov[0].iov_len = sizeof(msg);
iov[1].iov_base = domid_str;
iov[1].iov_len = strlen(domid_str) + 1;
iov[2].iov_base = feat_str;
iov[2].iov_len = strlen(feat_str) + 1;
return xs_bool(xs_talkv(h, iov, ARRAY_SIZE(iov), NULL));
}
char *xs_control_command(struct xs_handle *h, const char *cmd,
void *data, unsigned int len)
{
struct xsd_sockmsg msg = { .type = XS_CONTROL };
struct iovec iov[3];
iov[0].iov_base = &msg;
iov[0].iov_len = sizeof(msg);
iov[1].iov_base = (void *)cmd;
iov[1].iov_len = strlen(cmd) + 1;
iov[2].iov_base = data;
iov[2].iov_len = len;
return xs_talkv(h, iov, ARRAY_SIZE(iov), NULL);
}
char *xs_debug_command(struct xs_handle *h, const char *cmd,
void *data, unsigned int len)
{
return xs_control_command(h, cmd, data, len);
}
static int read_message(struct xs_handle *h, int nonblocking)
{
/* IMPORTANT: It is forbidden to call this function without
* acquiring the request lock and checking that h->read_thr_exists
* is false. See "Lock discipline" in struct xs_handle, above. */
/* If nonblocking==1, this function will always read either
* nothing, returning -1 and setting errno==EAGAIN, or we read
* whole amount requested. Ie as soon as we have the start of
* the message we block until we get all of it.
*/
struct xs_stored_msg *msg = NULL;
char *body = NULL;
int saved_errno = 0;
int ret = -1;
/* Allocate message structure and read the message header. */
msg = malloc(sizeof(*msg));
if (msg == NULL)
goto error;
cleanup_push_heap(msg);
if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr), nonblocking)) { /* Cancellation point */
saved_errno = errno;
goto error_freemsg;
}
/* Sanity check message body length. */
if (msg->hdr.len > XENSTORE_PAYLOAD_MAX) {
saved_errno = E2BIG;
goto error_freemsg;
}
/* Allocate and read the message body. */
body = msg->body = malloc(msg->hdr.len + 1);
if (body == NULL)
goto error_freemsg;
cleanup_push_heap(body);
if (!read_all(h->fd, body, msg->hdr.len, 0)) { /* Cancellation point */
saved_errno = errno;
goto error_freebody;
}
body[msg->hdr.len] = '\0';
if (msg->hdr.type == XS_WATCH_EVENT) {
mutex_lock(&h->watch_mutex);
cleanup_push(pthread_mutex_unlock, &h->watch_mutex);
/* Kick users out of their select() loop. */
if (XEN_TAILQ_EMPTY(&h->watch_list) &&
(h->watch_pipe[1] != -1))
while (write(h->watch_pipe[1], body, 1) != 1) /* Cancellation point */
continue;
XEN_TAILQ_INSERT_TAIL(&h->watch_list, msg, list);
condvar_signal(&h->watch_condvar);
cleanup_pop(1);
} else {
mutex_lock(&h->reply_mutex);
/* There should only ever be one response pending! */
if (!XEN_TAILQ_EMPTY(&h->reply_list)) {
mutex_unlock(&h->reply_mutex);
saved_errno = EEXIST;
goto error_freebody;
}
XEN_TAILQ_INSERT_TAIL(&h->reply_list, msg, list);
condvar_signal(&h->reply_condvar);
mutex_unlock(&h->reply_mutex);
}
ret = 0;
error_freebody:
cleanup_pop_heap(ret == -1, body);
error_freemsg:
cleanup_pop_heap(ret == -1, msg);
error:
errno = saved_errno;
return ret;
}
const char *xs_daemon_socket(void)
{
return xenstore_daemon_path();
}
const char *xs_daemon_socket_ro(void)
{
return xs_daemon_socket();
}
const char *xs_daemon_rundir(void)
{
return xenstore_daemon_rundir();
}
bool xs_strings_to_perms(struct xs_permissions *perms, unsigned int num,
const char *strings)
{
return xenstore_strings_to_perms(perms, num, strings);
}
#ifdef USE_PTHREAD
static void *read_thread(void *arg)
{
struct xs_handle *h = arg;
int fd;
while (read_message(h, 0) != -1)
continue;
/* An error return from read_message leaves the socket in an undefined
* state; we might have read only the header and not the message after
* it, or (more commonly) the other end has closed the connection.
* Since further communication is unsafe, close the socket.
*/
fd = h->fd;
h->fd = -1;
close(fd);
/* wake up all waiters */
pthread_mutex_lock(&h->reply_mutex);
pthread_cond_broadcast(&h->reply_condvar);
pthread_mutex_unlock(&h->reply_mutex);
pthread_mutex_lock(&h->watch_mutex);
pthread_cond_broadcast(&h->watch_condvar);
pthread_mutex_unlock(&h->watch_mutex);
return NULL;
}
#endif
/*
* Local variables:
* mode: C
* c-file-style: "linux"
* indent-tabs-mode: t
* c-basic-offset: 8
* tab-width: 8
* End:
*/