blob: 3f783a0bdec9fd883c5959565c3620ea01618f67 [file] [log] [blame]
/*
* libiio - Library for interfacing industrial I/O (IIO) devices
*
* Copyright (C) 2014 Analog Devices, Inc.
* Author: Paul Cercueil <paul.cercueil@analog.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* */
#include "ops.h"
#include "parser.h"
#include "thread-pool.h"
#include "../debug.h"
#include "../iio-private.h"
#include <errno.h>
#include <limits.h>
#include <pthread.h>
#include <poll.h>
#include <stdbool.h>
#include <string.h>
#include <sys/eventfd.h>
#include <time.h>
#include <fcntl.h>
#include <signal.h>
int yyparse(yyscan_t scanner);
struct DevEntry;
/* Corresponds to a thread reading from a device */
struct ThdEntry {
SLIST_ENTRY(ThdEntry) parser_list_entry;
SLIST_ENTRY(ThdEntry) dev_list_entry;
unsigned int nb, sample_size, samples_count;
ssize_t err;
int eventfd;
struct parser_pdata *pdata;
struct iio_device *dev;
struct DevEntry *entry;
uint32_t *mask;
bool active, is_writer, new_client, wait_for_open;
};
static void thd_entry_event_signal(struct ThdEntry *thd)
{
uint64_t e = 1;
int ret;
do {
ret = write(thd->eventfd, &e, sizeof(e));
} while (ret == -1 && errno == EINTR);
}
static int thd_entry_event_wait(struct ThdEntry *thd, pthread_mutex_t *mutex,
int fd_in)
{
struct pollfd pfd[3];
uint64_t e;
int ret;
pthread_mutex_unlock(mutex);
pfd[0].fd = thd->eventfd;
pfd[0].events = POLLIN;
pfd[1].fd = fd_in;
pfd[1].events = POLLRDHUP;
pfd[2].fd = thread_pool_get_poll_fd(thd->pdata->pool);
pfd[2].events = POLLIN;
do {
poll_nointr(pfd, 3);
if ((pfd[1].revents & POLLRDHUP) || (pfd[2].revents & POLLIN)) {
pthread_mutex_lock(mutex);
return -EPIPE;
}
do {
ret = read(thd->eventfd, &e, sizeof(e));
} while (ret == -1 && errno == EINTR);
} while (ret == -1 && errno == EAGAIN);
pthread_mutex_lock(mutex);
return 0;
}
/* Corresponds to an opened device */
struct DevEntry {
unsigned int ref_count;
struct iio_device *dev;
struct iio_buffer *buf;
unsigned int sample_size, nb_clients;
bool update_mask;
bool cyclic;
bool closed;
bool cancelled;
/* Linked list of ThdEntry structures corresponding
* to all the threads who opened the device */
SLIST_HEAD(ThdHead, ThdEntry) thdlist_head;
pthread_mutex_t thdlist_lock;
pthread_cond_t rw_ready_cond;
uint32_t *mask;
size_t nb_words;
};
struct sample_cb_info {
struct parser_pdata *pdata;
unsigned int nb_bytes, cpt;
uint32_t *mask;
};
/* Protects iio_device_{set,get}_data() from concurrent access from multiple
* clients */
static pthread_mutex_t devlist_lock = PTHREAD_MUTEX_INITIALIZER;
#if WITH_AIO
static ssize_t async_io(struct parser_pdata *pdata, void *buf, size_t len,
bool do_read)
{
ssize_t ret;
struct pollfd pfd[2];
unsigned int num_pfds;
struct iocb iocb;
struct iocb *ios[1];
struct io_event e[1];
ios[0] = &iocb;
if (do_read)
io_prep_pread(&iocb, pdata->fd_in, buf, len, 0);
else
io_prep_pwrite(&iocb, pdata->fd_out, buf, len, 0);
io_set_eventfd(&iocb, pdata->aio_eventfd);
pthread_mutex_lock(&pdata->aio_mutex);
ret = io_submit(pdata->aio_ctx, 1, ios);
if (ret != 1) {
pthread_mutex_unlock(&pdata->aio_mutex);
ERROR("Failed to submit IO operation: %zd\n", ret);
return -EIO;
}
pfd[0].fd = pdata->aio_eventfd;
pfd[0].events = POLLIN;
pfd[0].revents = 0;
pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
pfd[1].events = POLLIN;
pfd[1].revents = 0;
num_pfds = 2;
do {
poll_nointr(pfd, num_pfds);
if (pfd[0].revents & POLLIN) {
uint64_t event;
ret = read(pdata->aio_eventfd, &event, sizeof(event));
if (ret != sizeof(event)) {
ERROR("Failed to read from eventfd: %d\n", -errno);
ret = -EIO;
break;
}
ret = io_getevents(pdata->aio_ctx, 0, 1, e, NULL);
if (ret != 1) {
ERROR("Failed to read IO events: %zd\n", ret);
ret = -EIO;
break;
} else {
ret = (long)e[0].res;
}
} else if ((num_pfds > 1 && pfd[1].revents & POLLIN)) {
/* Got a STOP event to abort this whole session */
ret = io_cancel(pdata->aio_ctx, &iocb, e);
if (ret != -EINPROGRESS && ret != -EINVAL) {
ERROR("Failed to cancel IO transfer: %zd\n", ret);
ret = -EIO;
break;
}
/* It should not be long now until we get the cancellation event */
num_pfds = 1;
}
} while (!(pfd[0].revents & POLLIN));
pthread_mutex_unlock(&pdata->aio_mutex);
/* Got STOP event, treat it as EOF */
if (num_pfds == 1)
return 0;
return ret;
}
#define MAX_AIO_REQ_SIZE (1024 * 1024)
static ssize_t readfd_aio(struct parser_pdata *pdata, void *dest, size_t len)
{
if (len > MAX_AIO_REQ_SIZE)
len = MAX_AIO_REQ_SIZE;
return async_io(pdata, dest, len, true);
}
static ssize_t writefd_aio(struct parser_pdata *pdata, const void *dest,
size_t len)
{
if (len > MAX_AIO_REQ_SIZE)
len = MAX_AIO_REQ_SIZE;
return async_io(pdata, (void *)dest, len, false);
}
#endif /* WITH_AIO */
static ssize_t readfd_io(struct parser_pdata *pdata, void *dest, size_t len)
{
ssize_t ret;
struct pollfd pfd[2];
pfd[0].fd = pdata->fd_in;
pfd[0].events = POLLIN | POLLRDHUP;
pfd[0].revents = 0;
pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
pfd[1].events = POLLIN;
pfd[1].revents = 0;
do {
poll_nointr(pfd, 2);
/* Got STOP event, or client closed the socket: treat it as EOF */
if (pfd[1].revents & POLLIN || pfd[0].revents & POLLRDHUP)
return 0;
if (pfd[0].revents & POLLERR)
return -EIO;
if (!(pfd[0].revents & POLLIN))
continue;
do {
if (pdata->fd_in_is_socket)
ret = recv(pdata->fd_in, dest, len, MSG_NOSIGNAL);
else
ret = read(pdata->fd_in, dest, len);
} while (ret == -1 && errno == EINTR);
if (ret != -1 || errno != EAGAIN)
break;
} while (true);
if (ret == -1)
return -errno;
return ret;
}
static ssize_t writefd_io(struct parser_pdata *pdata, const void *src, size_t len)
{
ssize_t ret;
struct pollfd pfd[2];
pfd[0].fd = pdata->fd_out;
pfd[0].events = POLLOUT;
pfd[0].revents = 0;
pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
pfd[1].events = POLLIN;
pfd[1].revents = 0;
do {
poll_nointr(pfd, 2);
/* Got STOP event, or client closed the socket: treat it as EOF */
if (pfd[1].revents & POLLIN || pfd[0].revents & POLLHUP)
return 0;
if (pfd[0].revents & POLLERR)
return -EIO;
if (!(pfd[0].revents & POLLOUT))
continue;
do {
if (pdata->fd_out_is_socket)
ret = send(pdata->fd_out, src, len, MSG_NOSIGNAL);
else
ret = write(pdata->fd_out, src, len);
} while (ret == -1 && errno == EINTR);
if (ret != -1 || errno != EAGAIN)
break;
} while (true);
if (ret == -1)
return -errno;
return ret;
}
ssize_t write_all(struct parser_pdata *pdata, const void *src, size_t len)
{
uintptr_t ptr = (uintptr_t) src;
while (len) {
ssize_t ret = pdata->writefd(pdata, (void *) ptr, len);
if (ret < 0)
return ret;
if (!ret)
return -EPIPE;
ptr += ret;
len -= ret;
}
return ptr - (uintptr_t) src;
}
static ssize_t read_all(struct parser_pdata *pdata,
void *dst, size_t len)
{
uintptr_t ptr = (uintptr_t) dst;
while (len) {
ssize_t ret = pdata->readfd(pdata, (void *) ptr, len);
if (ret < 0)
return ret;
if (!ret)
return -EPIPE;
ptr += ret;
len -= ret;
}
return ptr - (uintptr_t) dst;
}
static void print_value(struct parser_pdata *pdata, long value)
{
if (pdata->verbose && value < 0) {
char buf[1024];
iio_strerror(-value, buf, sizeof(buf));
output(pdata, "ERROR: ");
output(pdata, buf);
output(pdata, "\n");
} else {
char buf[128];
sprintf(buf, "%li\n", value);
output(pdata, buf);
}
}
static ssize_t send_sample(const struct iio_channel *chn,
void *src, size_t length, void *d)
{
struct sample_cb_info *info = d;
if (chn->index < 0 || !TEST_BIT(info->mask, chn->index))
return 0;
if (info->nb_bytes < length)
return 0;
if (info->cpt % length) {
unsigned int i, goal = length - info->cpt % length;
char zero = 0;
ssize_t ret;
for (i = 0; i < goal; i++) {
ret = info->pdata->writefd(info->pdata, &zero, 1);
if (ret < 0)
return ret;
}
info->cpt += goal;
}
info->cpt += length;
info->nb_bytes -= length;
return write_all(info->pdata, src, length);
}
static ssize_t receive_sample(const struct iio_channel *chn,
void *dst, size_t length, void *d)
{
struct sample_cb_info *info = d;
if (chn->index < 0 || !TEST_BIT(info->mask, chn->index))
return 0;
if (info->cpt == info->nb_bytes)
return 0;
/* Skip the padding if needed */
if (info->cpt % length) {
unsigned int i, goal = length - info->cpt % length;
char foo;
ssize_t ret;
for (i = 0; i < goal; i++) {
ret = info->pdata->readfd(info->pdata, &foo, 1);
if (ret < 0)
return ret;
}
info->cpt += goal;
}
info->cpt += length;
return read_all(info->pdata, dst, length);
}
static ssize_t send_data(struct DevEntry *dev, struct ThdEntry *thd, size_t len)
{
struct parser_pdata *pdata = thd->pdata;
bool demux = server_demux && dev->sample_size != thd->sample_size;
if (demux)
len = (len / dev->sample_size) * thd->sample_size;
if (len > thd->nb)
len = thd->nb;
print_value(pdata, len);
if (thd->new_client) {
unsigned int i;
char buf[129], *ptr = buf;
uint32_t *mask = demux ? thd->mask : dev->mask;
ssize_t ret;
/* Send the current mask */
for (i = dev->nb_words; i > 0 && ptr < buf + sizeof(buf);
i--, ptr += 8)
sprintf(ptr, "%08x", mask[i - 1]);
*ptr = '\n';
ret = write_all(pdata, buf, ptr + 1 - buf);
if (ret < 0)
return ret;
thd->new_client = false;
}
if (!demux) {
/* Short path */
return write_all(pdata, dev->buf->buffer, len);
} else {
struct sample_cb_info info = {
.pdata = pdata,
.cpt = 0,
.nb_bytes = len,
.mask = thd->mask,
};
return iio_buffer_foreach_sample(dev->buf, send_sample, &info);
}
}
static ssize_t receive_data(struct DevEntry *dev, struct ThdEntry *thd)
{
struct parser_pdata *pdata = thd->pdata;
/* Inform that no error occured, and that we'll start reading data */
if (thd->new_client) {
print_value(thd->pdata, 0);
thd->new_client = false;
}
if (dev->sample_size == thd->sample_size) {
/* Short path: Receive directly in the buffer */
size_t len = dev->buf->length;
if (thd->nb < len)
len = thd->nb;
return read_all(pdata, dev->buf->buffer, len);
} else {
/* Long path: Mux the samples to the buffer */
struct sample_cb_info info = {
.pdata = pdata,
.cpt = 0,
.nb_bytes = thd->nb,
.mask = thd->mask,
};
return iio_buffer_foreach_sample(dev->buf,
receive_sample, &info);
}
}
static void dev_entry_put(struct DevEntry *entry)
{
bool free_entry = false;
pthread_mutex_lock(&entry->thdlist_lock);
entry->ref_count--;
if (entry->ref_count == 0)
free_entry = true;
pthread_mutex_unlock(&entry->thdlist_lock);
if (free_entry) {
pthread_mutex_destroy(&entry->thdlist_lock);
pthread_cond_destroy(&entry->rw_ready_cond);
free(entry->mask);
free(entry);
}
}
static void signal_thread(struct ThdEntry *thd, ssize_t ret)
{
thd->err = ret;
thd->nb = 0;
thd->active = false;
thd_entry_event_signal(thd);
}
static void rw_thd(struct thread_pool *pool, void *d)
{
struct DevEntry *entry = d;
struct ThdEntry *thd, *next_thd;
struct iio_device *dev = entry->dev;
unsigned int nb_words = entry->nb_words;
ssize_t ret = 0;
DEBUG("R/W thread started for device %s\n",
dev->name ? dev->name : dev->id);
while (true) {
bool has_readers = false, has_writers = false,
mask_updated = false;
unsigned int sample_size;
/* NOTE: this while loop must exit with thdlist_lock locked. */
pthread_mutex_lock(&entry->thdlist_lock);
if (SLIST_EMPTY(&entry->thdlist_head))
break;
if (entry->update_mask) {
unsigned int i;
unsigned int samples_count = 0;
memset(entry->mask, 0, nb_words * sizeof(*entry->mask));
SLIST_FOREACH(thd, &entry->thdlist_head, dev_list_entry) {
for (i = 0; i < nb_words; i++)
entry->mask[i] |= thd->mask[i];
if (thd->samples_count > samples_count)
samples_count = thd->samples_count;
}
if (entry->buf)
iio_buffer_destroy(entry->buf);
for (i = 0; i < dev->nb_channels; i++) {
struct iio_channel *chn = dev->channels[i];
long index = chn->index;
if (index >= 0 && TEST_BIT(entry->mask, i))
iio_channel_enable(chn);
else
iio_channel_disable(chn);
}
entry->buf = iio_device_create_buffer(dev,
samples_count, entry->cyclic);
if (!entry->buf) {
ret = -errno;
ERROR("Unable to create buffer\n");
break;
}
entry->cancelled = false;
/* Signal the threads that we opened the device */
SLIST_FOREACH(thd, &entry->thdlist_head, dev_list_entry) {
if (thd->wait_for_open) {
thd->wait_for_open = false;
signal_thread(thd, 0);
}
}
DEBUG("IIO device %s reopened with new mask:\n",
dev->id);
for (i = 0; i < nb_words; i++)
DEBUG("Mask[%i] = 0x%08x\n", i, entry->mask[i]);
entry->update_mask = false;
entry->sample_size = iio_device_get_sample_size(dev);
mask_updated = true;
}
sample_size = entry->sample_size;
SLIST_FOREACH(thd, &entry->thdlist_head, dev_list_entry) {
thd->active = !thd->err && thd->nb >= sample_size;
if (mask_updated && thd->active)
signal_thread(thd, thd->nb);
if (thd->is_writer)
has_writers |= thd->active;
else
has_readers |= thd->active;
}
if (!has_readers && !has_writers) {
pthread_cond_wait(&entry->rw_ready_cond,
&entry->thdlist_lock);
}
pthread_mutex_unlock(&entry->thdlist_lock);
if (!has_readers && !has_writers)
continue;
if (has_readers) {
ssize_t nb_bytes;
ret = iio_buffer_refill(entry->buf);
pthread_mutex_lock(&entry->thdlist_lock);
/*
* When the last client disconnects the buffer is
* cancelled and iio_buffer_refill() returns an error. A
* new client might have connected before we got here
* though, in that case the rw thread has to stay active
* and a new buffer is created. If the list is still empty the loop
* will exit normally.
*/
if (entry->cancelled) {
pthread_mutex_unlock(&entry->thdlist_lock);
continue;
}
if (ret < 0) {
ERROR("Reading from device failed: %i\n",
(int) ret);
break;
}
nb_bytes = ret;
/* We don't use SLIST_FOREACH here. As soon as a thread is
* signaled, its "thd" structure might be freed;
* SLIST_FOREACH would then cause a segmentation fault, as it
* reads "thd" to get the address of the next element. */
for (thd = SLIST_FIRST(&entry->thdlist_head);
thd; thd = next_thd) {
next_thd = SLIST_NEXT(thd, dev_list_entry);
if (!thd->active || thd->is_writer)
continue;
ret = send_data(entry, thd, nb_bytes);
if (ret > 0)
thd->nb -= ret;
if (ret < 0 || thd->nb < sample_size)
signal_thread(thd, (ret < 0) ?
ret : thd->nb);
}
pthread_mutex_unlock(&entry->thdlist_lock);
}
if (has_writers) {
ssize_t nb_bytes = 0;
pthread_mutex_lock(&entry->thdlist_lock);
/* Reset the size of the buffer to its maximum size */
entry->buf->data_length = entry->buf->length;
/* Same comment as above */
for (thd = SLIST_FIRST(&entry->thdlist_head);
thd; thd = next_thd) {
next_thd = SLIST_NEXT(thd, dev_list_entry);
if (!thd->active || !thd->is_writer)
continue;
ret = receive_data(entry, thd);
if (ret > 0) {
thd->nb -= ret;
if (ret > nb_bytes)
nb_bytes = ret;
}
if (ret < 0)
signal_thread(thd, ret);
}
ret = iio_buffer_push_partial(entry->buf,
nb_bytes / sample_size);
if (entry->cancelled) {
pthread_mutex_unlock(&entry->thdlist_lock);
continue;
}
if (ret < 0) {
ERROR("Writing to device failed: %i\n",
(int) ret);
break;
}
/* Signal threads which completed their RW command */
for (thd = SLIST_FIRST(&entry->thdlist_head);
thd; thd = next_thd) {
next_thd = SLIST_NEXT(thd, dev_list_entry);
if (thd->active && thd->is_writer &&
thd->nb < sample_size)
signal_thread(thd, thd->nb);
}
pthread_mutex_unlock(&entry->thdlist_lock);
}
}
/* Signal all remaining threads */
for (thd = SLIST_FIRST(&entry->thdlist_head); thd; thd = next_thd) {
next_thd = SLIST_NEXT(thd, dev_list_entry);
SLIST_REMOVE(&entry->thdlist_head, thd, ThdEntry, dev_list_entry);
thd->wait_for_open = false;
signal_thread(thd, ret);
}
if (entry->buf) {
iio_buffer_destroy(entry->buf);
entry->buf = NULL;
}
entry->closed = true;
pthread_mutex_unlock(&entry->thdlist_lock);
pthread_mutex_lock(&devlist_lock);
/* It is possible that a new thread has already started, make sure to
* not overwrite it. */
if (iio_device_get_data(dev) == entry)
iio_device_set_data(dev, NULL);
pthread_mutex_unlock(&devlist_lock);
DEBUG("Stopping R/W thread for device %s\n",
dev->name ? dev->name : dev->id);
dev_entry_put(entry);
}
static struct ThdEntry *parser_lookup_thd_entry(struct parser_pdata *pdata,
struct iio_device *dev)
{
struct ThdEntry *t;
SLIST_FOREACH(t, &pdata->thdlist_head, parser_list_entry) {
if (t->dev == dev)
return t;
}
return NULL;
}
static ssize_t rw_buffer(struct parser_pdata *pdata,
struct iio_device *dev, unsigned int nb, bool is_write)
{
struct DevEntry *entry;
struct ThdEntry *thd;
ssize_t ret;
if (!dev)
return -ENODEV;
thd = parser_lookup_thd_entry(pdata, dev);
if (!thd)
return -EBADF;
entry = thd->entry;
if (nb < entry->sample_size)
return 0;
pthread_mutex_lock(&entry->thdlist_lock);
if (entry->closed) {
pthread_mutex_unlock(&entry->thdlist_lock);
return -EBADF;
}
if (thd->nb) {
pthread_mutex_unlock(&entry->thdlist_lock);
return -EBUSY;
}
thd->new_client = true;
thd->nb = nb;
thd->err = 0;
thd->is_writer = is_write;
thd->active = true;
pthread_cond_signal(&entry->rw_ready_cond);
DEBUG("Waiting for completion...\n");
while (thd->active) {
ret = thd_entry_event_wait(thd, &entry->thdlist_lock, pdata->fd_in);
if (ret)
break;
}
if (ret == 0)
ret = thd->err;
pthread_mutex_unlock(&entry->thdlist_lock);
if (ret > 0 && ret < nb)
print_value(thd->pdata, 0);
DEBUG("Exiting rw_buffer with code %li\n", (long) ret);
if (ret < 0)
return ret;
else
return nb - ret;
}
static uint32_t *get_mask(const char *mask, size_t *len)
{
size_t nb = (*len + 7) / 8;
uint32_t *ptr, *words = calloc(nb, sizeof(*words));
if (!words)
return NULL;
ptr = words + nb;
while (*mask) {
char buf[9];
sprintf(buf, "%.*s", 8, mask);
sscanf(buf, "%08x", --ptr);
mask += 8;
DEBUG("Mask[%lu]: 0x%08x\n",
(unsigned long) (words - ptr) / 4, *ptr);
}
*len = nb;
return words;
}
static void free_thd_entry(struct ThdEntry *t)
{
close(t->eventfd);
free(t->mask);
free(t);
}
static void remove_thd_entry(struct ThdEntry *t)
{
struct DevEntry *entry = t->entry;
pthread_mutex_lock(&entry->thdlist_lock);
if (!entry->closed) {
entry->update_mask = true;
SLIST_REMOVE(&entry->thdlist_head, t, ThdEntry, dev_list_entry);
if (SLIST_EMPTY(&entry->thdlist_head) && entry->buf) {
entry->cancelled = true;
iio_buffer_cancel(entry->buf); /* Wakeup the rw thread */
}
pthread_cond_signal(&entry->rw_ready_cond);
}
pthread_mutex_unlock(&entry->thdlist_lock);
dev_entry_put(entry);
free_thd_entry(t);
}
static int open_dev_helper(struct parser_pdata *pdata, struct iio_device *dev,
size_t samples_count, const char *mask, bool cyclic)
{
int ret = -ENOMEM;
struct DevEntry *entry;
struct ThdEntry *thd;
size_t len = strlen(mask);
uint32_t *words;
unsigned int nb_channels;
unsigned int cyclic_retry = 500;
if (!dev)
return -ENODEV;
nb_channels = dev->nb_channels;
if (len != ((nb_channels + 31) / 32) * 8)
return -EINVAL;
words = get_mask(mask, &len);
if (!words)
return -ENOMEM;
thd = zalloc(sizeof(*thd));
if (!thd)
goto err_free_words;
thd->wait_for_open = true;
thd->mask = words;
thd->nb = 0;
thd->samples_count = samples_count;
thd->sample_size = iio_device_get_sample_size_mask(dev, words, len);
thd->pdata = pdata;
thd->dev = dev;
thd->eventfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
retry:
/* Atomically look up the thread and make sure that it is still active
* or allocate new one. */
pthread_mutex_lock(&devlist_lock);
entry = iio_device_get_data(dev);
if (entry) {
if (cyclic || entry->cyclic) {
/* Only one client allowed in cyclic mode */
pthread_mutex_unlock(&devlist_lock);
/* There is an inherent race condition if a client
* creates a new cyclic buffer shortly after destroying
* a previous. E.g. like
*
* iio_buffer_destroy(buf);
* buf = iio_device_create_buffer(dev, n, true);
*
* In this case the two buffers each use their own
* communication channel which are unordered to each
* other. E.g. the socket open might arrive before the
* socket close on the host side, even though they were
* sent in the opposite order on the client side. This
* race condition can cause an error being reported back
* to the client, even though the code on the client
* side was well formed and would work fine e.g. using
* the local backend.
*
* To avoid this issue go to sleep for up to 50ms in
* intervals of 100us. This should be enough time for
* the issue to resolve itself. If there actually is
* contention on the buffer an error will eventually be
* returned in which case the additional delay cause by
* the retires should not matter too much.
*
* This is not pretty but it works.
*/
if (cyclic_retry) {
cyclic_retry--;
usleep(100);
goto retry;
}
ret = -EBUSY;
goto err_free_thd;
}
pthread_mutex_lock(&entry->thdlist_lock);
if (!entry->closed) {
pthread_mutex_unlock(&devlist_lock);
entry->ref_count++;
SLIST_INSERT_HEAD(&entry->thdlist_head, thd, dev_list_entry);
thd->entry = entry;
entry->update_mask = true;
DEBUG("Added thread to client list\n");
pthread_cond_signal(&entry->rw_ready_cond);
/* Wait until the device is opened by the rw thread */
while (thd->wait_for_open) {
ret = thd_entry_event_wait(thd, &entry->thdlist_lock, pdata->fd_in);
if (ret)
break;
}
pthread_mutex_unlock(&entry->thdlist_lock);
if (ret == 0)
ret = (int) thd->err;
if (ret < 0)
remove_thd_entry(thd);
else
SLIST_INSERT_HEAD(&pdata->thdlist_head, thd, parser_list_entry);
return ret;
} else {
pthread_mutex_unlock(&entry->thdlist_lock);
}
}
entry = zalloc(sizeof(*entry));
if (!entry) {
pthread_mutex_unlock(&devlist_lock);
goto err_free_thd;
}
entry->ref_count = 2; /* One for thread, one for the client */
entry->mask = malloc(len * sizeof(*words));
if (!entry->mask) {
pthread_mutex_unlock(&devlist_lock);
goto err_free_entry;
}
entry->cyclic = cyclic;
entry->nb_words = len;
entry->update_mask = true;
entry->dev = dev;
entry->buf = NULL;
SLIST_INIT(&entry->thdlist_head);
SLIST_INSERT_HEAD(&entry->thdlist_head, thd, dev_list_entry);
thd->entry = entry;
DEBUG("Added thread to client list\n");
pthread_mutex_init(&entry->thdlist_lock, NULL);
pthread_cond_init(&entry->rw_ready_cond, NULL);
ret = thread_pool_add_thread(main_thread_pool, rw_thd, entry, "rw_thd");
if (ret) {
pthread_mutex_unlock(&devlist_lock);
goto err_free_entry_mask;
}
DEBUG("Adding new device thread to device list\n");
iio_device_set_data(dev, entry);
pthread_mutex_unlock(&devlist_lock);
pthread_mutex_lock(&entry->thdlist_lock);
/* Wait until the device is opened by the rw thread */
while (thd->wait_for_open) {
ret = thd_entry_event_wait(thd, &entry->thdlist_lock, pdata->fd_in);
if (ret)
break;
}
pthread_mutex_unlock(&entry->thdlist_lock);
if (ret == 0)
ret = (int) thd->err;
if (ret < 0)
remove_thd_entry(thd);
else
SLIST_INSERT_HEAD(&pdata->thdlist_head, thd, parser_list_entry);
return ret;
err_free_entry_mask:
free(entry->mask);
err_free_entry:
free(entry);
err_free_thd:
close(thd->eventfd);
free(thd);
err_free_words:
free(words);
return ret;
}
static int close_dev_helper(struct parser_pdata *pdata, struct iio_device *dev)
{
struct ThdEntry *t;
if (!dev)
return -ENODEV;
t = parser_lookup_thd_entry(pdata, dev);
if (!t)
return -ENXIO;
SLIST_REMOVE(&pdata->thdlist_head, t, ThdEntry, parser_list_entry);
remove_thd_entry(t);
return 0;
}
int open_dev(struct parser_pdata *pdata, struct iio_device *dev,
size_t samples_count, const char *mask, bool cyclic)
{
int ret = open_dev_helper(pdata, dev, samples_count, mask, cyclic);
print_value(pdata, ret);
return ret;
}
int close_dev(struct parser_pdata *pdata, struct iio_device *dev)
{
int ret = close_dev_helper(pdata, dev);
print_value(pdata, ret);
return ret;
}
ssize_t rw_dev(struct parser_pdata *pdata, struct iio_device *dev,
unsigned int nb, bool is_write)
{
ssize_t ret = rw_buffer(pdata, dev, nb, is_write);
if (ret <= 0 || is_write)
print_value(pdata, ret);
return ret;
}
ssize_t read_dev_attr(struct parser_pdata *pdata, struct iio_device *dev,
const char *attr, enum iio_attr_type type)
{
/* We use a very large buffer here, as if attr is NULL all the
* attributes will be read, which may represents a few kilobytes worth
* of data. */
char buf[0x10000];
ssize_t ret = -EINVAL;
if (!dev) {
print_value(pdata, -ENODEV);
return -ENODEV;
}
switch (type) {
case IIO_ATTR_TYPE_DEVICE:
ret = iio_device_attr_read(dev, attr, buf, sizeof(buf) - 1);
break;
case IIO_ATTR_TYPE_DEBUG:
ret = iio_device_debug_attr_read(dev,
attr, buf, sizeof(buf) - 1);
break;
case IIO_ATTR_TYPE_BUFFER:
ret = iio_device_buffer_attr_read(dev,
attr, buf, sizeof(buf) - 1);
break;
default:
ret = -EINVAL;
break;
}
print_value(pdata, ret);
if (ret < 0)
return ret;
buf[ret] = '\n';
return write_all(pdata, buf, ret + 1);
}
ssize_t write_dev_attr(struct parser_pdata *pdata, struct iio_device *dev,
const char *attr, size_t len, enum iio_attr_type type)
{
ssize_t ret = -ENOMEM;
char *buf;
if (!dev) {
ret = -ENODEV;
goto err_print_value;
}
buf = malloc(len);
if (!buf)
goto err_print_value;
ret = read_all(pdata, buf, len);
if (ret < 0)
goto err_free_buffer;
switch (type) {
case IIO_ATTR_TYPE_DEVICE:
ret = iio_device_attr_write_raw(dev, attr, buf, len);
break;
case IIO_ATTR_TYPE_DEBUG:
ret = iio_device_debug_attr_write_raw(dev, attr, buf, len);
break;
case IIO_ATTR_TYPE_BUFFER:
ret = iio_device_buffer_attr_write_raw(dev, attr, buf, len);
break;
default:
ret = -EINVAL;
break;
}
err_free_buffer:
free(buf);
err_print_value:
print_value(pdata, ret);
return ret;
}
ssize_t read_chn_attr(struct parser_pdata *pdata,
struct iio_channel *chn, const char *attr)
{
char buf[1024];
ssize_t ret = -ENODEV;
if (chn)
ret = iio_channel_attr_read(chn, attr, buf, sizeof(buf) - 1);
else if (pdata->dev)
ret = -ENXIO;
print_value(pdata, ret);
if (ret < 0)
return ret;
buf[ret] = '\n';
return write_all(pdata, buf, ret + 1);
}
ssize_t write_chn_attr(struct parser_pdata *pdata,
struct iio_channel *chn, const char *attr, size_t len)
{
ssize_t ret = -ENOMEM;
char *buf = malloc(len);
if (!buf)
goto err_print_value;
ret = read_all(pdata, buf, len);
if (ret < 0)
goto err_free_buffer;
if (chn)
ret = iio_channel_attr_write_raw(chn, attr, buf, len);
else if (pdata->dev)
ret = -ENXIO;
else
ret = -ENODEV;
err_free_buffer:
free(buf);
err_print_value:
print_value(pdata, ret);
return ret;
}
ssize_t set_trigger(struct parser_pdata *pdata,
struct iio_device *dev, const char *trigger)
{
struct iio_device *trig = NULL;
ssize_t ret = -ENOENT;
if (!dev) {
ret = -ENODEV;
goto err_print_value;
}
if (trigger) {
trig = iio_context_find_device(pdata->ctx, trigger);
if (!trig)
goto err_print_value;
}
ret = iio_device_set_trigger(dev, trig);
err_print_value:
print_value(pdata, ret);
return ret;
}
ssize_t get_trigger(struct parser_pdata *pdata, struct iio_device *dev)
{
const struct iio_device *trigger;
ssize_t ret;
if (!dev) {
print_value(pdata, -ENODEV);
return -ENODEV;
}
ret = iio_device_get_trigger(dev, &trigger);
if (!ret && trigger) {
char buf[256];
ret = strlen(trigger->name);
print_value(pdata, ret);
snprintf(buf, sizeof(buf), "%s\n", trigger->name);
ret = write_all(pdata, buf, ret + 1);
} else {
print_value(pdata, ret);
}
return ret;
}
int set_timeout(struct parser_pdata *pdata, unsigned int timeout)
{
int ret = iio_context_set_timeout(pdata->ctx, timeout);
print_value(pdata, ret);
return ret;
}
int set_buffers_count(struct parser_pdata *pdata,
struct iio_device *dev, long value)
{
int ret = -EINVAL;
if (!dev) {
ret = -ENODEV;
goto err_print_value;
}
if (value >= 1)
ret = iio_device_set_kernel_buffers_count(
dev, (unsigned int) value);
err_print_value:
print_value(pdata, ret);
return ret;
}
ssize_t read_line(struct parser_pdata *pdata, char *buf, size_t len)
{
ssize_t ret;
if (pdata->fd_in_is_socket) {
struct pollfd pfd[2];
bool found;
size_t bytes_read = 0;
pfd[0].fd = pdata->fd_in;
pfd[0].events = POLLIN | POLLRDHUP;
pfd[0].revents = 0;
pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
pfd[1].events = POLLIN;
pfd[1].revents = 0;
do {
size_t i, to_trunc;
poll_nointr(pfd, 2);
if (pfd[1].revents & POLLIN ||
pfd[0].revents & POLLRDHUP)
return 0;
/* First read from the socket, without advancing the
* read offset */
ret = recv(pdata->fd_in, buf, len,
MSG_NOSIGNAL | MSG_PEEK);
if (ret < 0)
return -errno;
/* Lookup for the trailing \n */
for (i = 0; i < (size_t) ret && buf[i] != '\n'; i++);
found = i < (size_t) ret;
len -= ret;
buf += ret;
to_trunc = found ? i + 1 : (size_t) ret;
/* Advance the read offset after the \n if found, or
* after the last character read otherwise */
ret = recv(pdata->fd_in, NULL, to_trunc,
MSG_NOSIGNAL | MSG_TRUNC);
if (ret < 0)
return -errno;
bytes_read += to_trunc;
} while (!found && len);
/* No \n found? Just garbage data */
if (!found)
ret = -EIO;
else
ret = bytes_read;
} else {
ret = pdata->readfd(pdata, buf, len);
}
return ret;
}
void interpreter(struct iio_context *ctx, int fd_in, int fd_out, bool verbose,
bool is_socket, bool use_aio, struct thread_pool *pool)
{
yyscan_t scanner;
struct parser_pdata pdata;
unsigned int i;
int ret;
pdata.ctx = ctx;
pdata.stop = false;
pdata.fd_in = fd_in;
pdata.fd_out = fd_out;
pdata.verbose = verbose;
pdata.pool = pool;
pdata.fd_in_is_socket = is_socket;
pdata.fd_out_is_socket = is_socket;
SLIST_INIT(&pdata.thdlist_head);
if (use_aio) {
/* Note: if WITH_AIO is not defined, use_aio is always false.
* We ensure that in iiod.c. */
#if WITH_AIO
char err_str[1024];
pdata.aio_eventfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (pdata.aio_eventfd < 0) {
iio_strerror(errno, err_str, sizeof(err_str));
ERROR("Failed to create AIO eventfd: %s\n", err_str);
return;
}
pdata.aio_ctx = 0;
ret = io_setup(1, &pdata.aio_ctx);
if (ret < 0) {
iio_strerror(-ret, err_str, sizeof(err_str));
ERROR("Failed to create AIO context: %s\n", err_str);
close(pdata.aio_eventfd);
return;
}
pthread_mutex_init(&pdata.aio_mutex, NULL);
pdata.readfd = readfd_aio;
pdata.writefd = writefd_aio;
#endif
} else {
pdata.readfd = readfd_io;
pdata.writefd = writefd_io;
}
yylex_init_extra(&pdata, &scanner);
do {
if (verbose)
output(&pdata, "iio-daemon > ");
ret = yyparse(scanner);
} while (!pdata.stop && ret >= 0);
yylex_destroy(scanner);
/* Close all opened devices */
for (i = 0; i < ctx->nb_devices; i++)
close_dev_helper(&pdata, ctx->devices[i]);
#if WITH_AIO
if (use_aio) {
io_destroy(pdata.aio_ctx);
close(pdata.aio_eventfd);
}
#endif
}