runtime/dyninst/transport.c - systemtap
Global variables defined
Functions defined
Macros defined
Source code
/* -*- linux-c -*-
* Transport Functions
* Copyright (C) 2013 Red Hat Inc.
*
* This file is part of systemtap, and is free software. You can
* redistribute it and/or modify it under the terms of the GNU General
* Public License (GPL); either version 2, or (at your option) any
* later version.
*/
#ifndef _STAPDYN_TRANSPORT_C_
#define _STAPDYN_TRANSPORT_C_
#include <time.h>
#include <unistd.h>
#include <sys/types.h>
#include <spawn.h>
#include <sys/syscall.h>
#include <errno.h>
#include <string.h>
#include <search.h>
#include <signal.h>
#include "transport.h"
////////////////////////////////////////
//
// GENERAL TRANSPORT OVERVIEW
//
// Each context structure has a '_stp_transport_context_data'
// structure (described in more detail later) in it, which contains
// that context's print and log (warning/error) buffers. There is a
// session-wide double-buffered queue (stored in the
// '_stp_transport_session_data' structure) where each probe can send
// print/control messages to a fairly simple consumer thread (see
// _stp_dyninst_transport_thread_func() for details). The consumer
// thread swaps the read/write queues, then handles each request.
//
// Note that there is as little as possible data copying going on. A
// probe adds data to a print/log buffer stored in shared memory, then
// the consumer queue outputs the data from that same buffer.
//
//
// QUEUE OVERVIEW
//
// See the session-wide queue's definition in transport.h. It is
// composed of the '_stp_transport_queue_item', '_stp_transport_queue'
// and '_stp_transport_session_data' structures.
//
// The queue is double-buffered and stored in shared memory. Because
// it is session-wide, and multiple threads can be trying to add data
// to it simultaneously, the 'queue_mutex' is used to serialize
// access. Probes write to the write queue. When the consumer thread
// realizes data is available, it swaps the read/write queues (by
// changing the 'write_queue' value) and then processes each
// '_stp_transport_queue_item' on the read queue.
//
// If the queue is full, probes will wait on the 'queue_space_avail'
// condition variable for more space. The consumer thread sets
// 'queue_space_avail' when it swaps the read/write queues.
//
// The consumer thread waits on the 'queue_data_avail' condition
// variable to know when more items are available. When probes add
// items to the queue (using __stp_dyninst_transport_queue_add()),
// 'queue_data_avail' gets set.
//
//
// LOG BUFFER OVERVIEW
//
// See the context-specific log buffer's (struct
// _stp_transport_context_data) definition in transport.h.
//
// The log buffer, used for warning/error messages, is stored in
// shared memory. Each context structure has its own log buffer. Each
// log buffer logically contains '_STP_LOG_BUF_ENTRIES' buffers of
// length 'STP_LOG_BUF_LEN'. In other words, the log buffer allocation
// is done in chunks of size 'STP_LOG_BUF_LEN'. The log buffer is
// circular, and the indices use an extra most significant bit to
// indicate wrapping.
//
// Only the consumer thread removes items from the log buffer. The
// log buffer is circular, and the indices use an extra most
// significant bit to indicate wrapping.
//
// If the log buffer is full, probes will wait on the
// 'log_space_avail' condition variable for more space. The consumer
// thread sets 'log_space_avail' after finishing with a particular log
// buffer chunk.
//
// Note that the read index 'log_start' is only written to by the
// consumer thread and that the write index 'log_end' is only written
// to by the probes (with a locked context).
//
//
// PRINT BUFFER OVERVIEW
//
// See the context-specific print buffer definition (struct
// _stp_transport_context_data) in transport.h.
//
// The print buffer is stored in shared memory. Each context structure
// has its own print buffer. The print buffer really isn't a true
// circular buffer, it is more like a "semi-cicular" buffer. If a
// reservation request won't fit after the write offset, we go ahead
// and wrap around to the beginning (if available), leaving an unused
// gap at the end of the buffer. This is done to not break up
// reservation requests. Like a circular buffer, the offsets use an
// extra most significant bit to indicate wrapping.
//
// Only the consumer thread (normally) removes items from the print
// buffer. It is possible to 'unreserve' bytes using
// _stp_dyninst_transport_unreserve_bytes() if the bytes haven't been
// flushed.
//
// If the print buffer doesn't have enough bytes available, probes
// will flush any reserved bytes earlier than normal, then wait on the
// 'print_space_avail' condition variable for more space to become
// available. The consumer thread sets 'print_space_avail' after
// finishing with a particular print buffer segment.
//
// Note that the read index 'read_offset' is only written to by the
// consumer thread and that the write index 'write_offset' (and number
// of bytes to write 'write_bytes) is only written to by the probes
// (with a locked context).
//
////////////////////////////////////////
static pthread_t _stp_transport_thread;
static int _stp_transport_thread_started = 0;
#ifndef STP_DYNINST_TIMEOUT_SECS
#define STP_DYNINST_TIMEOUT_SECS 5
#endif
// When we're converting an circular buffer/index into a pointer
// value, we need the "normalized" value (i.e. one without the extra
// msb possibly set).
#define _STP_D_T_LOG_NORM(x) ((x) & (_STP_LOG_BUF_ENTRIES - 1))
#define _STP_D_T_PRINT_NORM(x) ((x) & (_STP_DYNINST_BUFFER_SIZE - 1))
// Define a macro to generically add circular buffer
// offsets/indicies.
#define __STP_D_T_ADD(offset, increment, buffer_size) \
(((offset) + (increment)) & (2 * (buffer_size) - 1))
// Using __STP_D_T_ADD(), define a specific macro for each circular
// buffer.
#define _STP_D_T_LOG_INC(offset) \
__STP_D_T_ADD((offset), 1, _STP_LOG_BUF_ENTRIES)
#define _STP_D_T_PRINT_ADD(offset, increment) \
__STP_D_T_ADD((offset), (increment), _STP_DYNINST_BUFFER_SIZE)
// Return a pointer to the session's current write queue.
#define _STP_D_T_WRITE_QUEUE(sess_data) \
(&((sess_data)->queues[(sess_data)->write_queue]))
// Limit remembered strings in __stp_d_t_eliminate_duplicate_warnings
#define MAX_STORED_WARNINGS 1024
// If the transport has an error or debug message to print, it can't very well
// recurse on itself, so we just print to the local stderr and hope...
static void _stp_transport_err (const char *fmt, ...)
__attribute ((format (printf, 1, 2)));
static void _stp_transport_err (const char *fmt, ...)
{
va_list args;
va_start(args, fmt);
vfprintf (stderr, fmt, args);
va_end(args);
}
#ifdef DEBUG_TRANS
#define _stp_transport_debug(fmt, ...) \
_stp_transport_err("%s:%d - " fmt, __FUNCTION__, __LINE__, ##__VA_ARGS__)
#else
#define _stp_transport_debug(fmt, ...) do { } while(0)
#endif
static void
__stp_dyninst_transport_queue_add(unsigned type, int data_index,
size_t offset, size_t bytes)
{
struct _stp_transport_session_data *sess_data = stp_transport_data();
if (sess_data == NULL)
return;
pthread_mutex_lock(&(sess_data->queue_mutex));
// While the write queue is full, wait.
while (_STP_D_T_WRITE_QUEUE(sess_data)->items
== (STP_DYNINST_QUEUE_ITEMS - 1)) {
pthread_cond_wait(&(sess_data->queue_space_avail),
&(sess_data->queue_mutex));
}
struct _stp_transport_queue *q = _STP_D_T_WRITE_QUEUE(sess_data);
struct _stp_transport_queue_item *item = &(q->queue[q->items]);
q->items++;
item->type = type;
item->data_index = data_index;
item->offset = offset;
item->bytes = bytes;
pthread_cond_signal(&(sess_data->queue_data_avail));
pthread_mutex_unlock(&(sess_data->queue_mutex));
}
/* Handle duplicate warning elimination. Returns 0 if we've seen this
* warning (and should be eliminated), 1 otherwise. */
static int
__stp_d_t_eliminate_duplicate_warnings(char *data, size_t bytes)
{
static void *seen = 0;
static unsigned seen_count = 0;
char *dupstr = strndup (data, bytes);
char *retval;
int rc = 1;
if (! dupstr) {
/* OOM, should not happen. */
return 1;
}
retval = tfind (dupstr, &seen,
(int (*)(const void*, const void*))strcmp);
if (! retval) { /* new message */
/* We set a maximum for stored warning messages, to
* prevent a misbehaving script/environment from
* emitting countless _stp_warn()s, and overflow
* staprun's memory. */
if (seen_count++ == MAX_STORED_WARNINGS) {
_stp_transport_err("WARNING deduplication table full\n");
free (dupstr);
}
else if (seen_count > MAX_STORED_WARNINGS) {
/* Be quiet in the future, but stop counting
* to preclude overflow. */
free (dupstr);
seen_count = MAX_STORED_WARNINGS + 1;
}
else if (seen_count < MAX_STORED_WARNINGS) {
/* NB: don't free dupstr; it's going into the tree. */
retval = tsearch (dupstr, & seen,
(int (*)(const void*, const void*))strcmp);
if (retval == 0) {
/* OOM, should not happen. Next time
* we should get the 'full'
* message. */
free (dupstr);
seen_count = MAX_STORED_WARNINGS;
}
}
}
else { /* old message */
free (dupstr);
rc = 0;
}
return rc;
}
static void
__stp_d_t_run_command(char *command)
{
/*
* FIXME: We'll need to make sure the output from system goes
* to the correct file descriptor. We may need some posix file
* actions to pass to posix_spawnp().
*/
char *spawn_argv[4] = { "sh", "-c", command, NULL };
int rc = posix_spawnp(NULL, "sh", NULL, NULL, spawn_argv, NULL);
if (rc != 0) {
_stp_transport_err("ERROR: %s : %s\n", command, strerror(rc));
}
/* Notice we're not waiting on the resulting process to finish. */
}
static void
__stp_d_t_request_exit(void)
{
/*
* We want stapdyn to trigger this module's exit code from outside. It
* knows to do this on receipt of signals, so we must kill ourselves.
* The signal handler will forward that to the main thread.
*
* NB: If the target process was created rather than attached, SIGTERM
* waits for it to exit. SIGQUIT always exits immediately. It's
* somewhat debateable which is most appropriate here...
*/
pthread_kill(pthread_self(), SIGTERM);
}
static ssize_t
_stp_write_retry(int fd, const void *buf, size_t count)
{
size_t remaining = count;
while (remaining > 0) {
ssize_t ret = write(fd, buf, remaining);
if (ret >= 0) {
buf += ret;
remaining -= ret;
}
else if (errno != EINTR) {
return ret;
}
}
return count;
}
static int
stap_strfloctime(char *buf, size_t max, const char *fmt, time_t t)
{
struct tm tm;
size_t ret;
if (buf == NULL || fmt == NULL || max <= 1)
return -EINVAL;
localtime_r(&t, &tm);
/* NB: this following invocation means that stapdyn modules can't be
checked with -Wformat-nonliteral. See compile_dyninst()
buildrun.cxx for the flags chosen. strftime parsing does not have
security implications AFAIK, but gcc still wants to check them. */
ret = strftime(buf, max, fmt, &tm);
if (ret == 0)
return -EINVAL;
return (int)ret;
}
static void *
_stp_dyninst_transport_thread_func(void *arg __attribute((unused)))
{
int stopping = 0;
int out_fd, err_fd;
struct _stp_transport_session_data *sess_data = stp_transport_data();
if (sess_data == NULL)
return NULL;
if (strlen(stp_session_attributes()->outfile_name)) {
char buf[PATH_MAX];
int rc;
rc = stap_strfloctime(buf, PATH_MAX,
stp_session_attributes()->outfile_name,
time(NULL));
if (rc < 0) {
_stp_transport_err("Invalid FILE name format\n");
return NULL;
}
out_fd = open (buf, O_CREAT|O_TRUNC|O_WRONLY|O_CLOEXEC, 0666);
if (out_fd < 0) {
_stp_transport_err("ERROR: Couldn't open output file %s: %s\n",
buf, strerror(rc));
return NULL;
}
}
else
out_fd = STDOUT_FILENO;
err_fd = STDERR_FILENO;
if (out_fd < 0 || err_fd < 0)
return NULL;
while (! stopping) {
struct _stp_transport_queue *q;
struct _stp_transport_queue_item *item;
struct context *c;
struct _stp_transport_context_data *data;
void *read_ptr;
pthread_mutex_lock(&(sess_data->queue_mutex));
// While there are no queue entries, wait.
q = _STP_D_T_WRITE_QUEUE(sess_data);
while (q->items == 0) {
// Mutex is locked. It is automatically
// unlocked while we are waiting.
pthread_cond_wait(&(sess_data->queue_data_avail),
&(sess_data->queue_mutex));
// Mutex is locked again.
}
// We've got data. Swap the queues and let any waiters
// know there is more space available.
sess_data->write_queue ^= 1;
pthread_cond_broadcast(&(sess_data->queue_space_avail));
pthread_mutex_unlock(&(sess_data->queue_mutex));
// Note that we're processing the read queue with no
// locking. This is possible since no other thread
// will be accessing it until we're finished with it
// (and we make it the write queue).
// Process the queue twice. First handle the OOB data types.
for (size_t i = 0; i < q->items; i++) {
int write_data = 1;
item = &(q->queue[i]);
if (! (item->type & STP_DYN_OOB_DATA_MASK))
continue;
c = stp_session_context(item->data_index);
data = &c->transport_data;
read_ptr = data->log_buf + item->offset;
switch (item->type) {
case STP_DYN_OOB_DATA:
_stp_transport_debug(
"STP_DYN_OOB_DATA (%ld bytes at offset %ld)\n",
item->bytes, item->offset);
/* Note that "WARNING:" should not be
* translated, since it is part of the
* module cmd protocol. */
if (strncmp(read_ptr, "WARNING:", 7) == 0) {
if (stp_session_attributes()->suppress_warnings) {
write_data = 0;
}
/* If we're not verbose, eliminate
* duplicate warning messages. */
else if (stp_session_attributes()->log_level
== 0) {
write_data = __stp_d_t_eliminate_duplicate_warnings(read_ptr, item->bytes);
}
}
/* "ERROR:" also should not be translated. */
else if (strncmp(read_ptr, "ERROR:", 5) == 0) {
if (_stp_exit_status == 0)
_stp_exit_status = 1;
}
if (! write_data) {
break;
}
if (_stp_write_retry(err_fd, read_ptr, item->bytes) < 0)
_stp_transport_err(
"couldn't write %ld bytes OOB data: %s\n",
(long)item->bytes, strerror(errno));
break;
case STP_DYN_SYSTEM:
_stp_transport_debug("STP_DYN_SYSTEM (%.*s) %d bytes\n",
(int)item->bytes, (char *)read_ptr,
(int)item->bytes);
/*
* Note that the null character is
* already included in the system
* string.
*/
__stp_d_t_run_command(read_ptr);
break;
default:
_stp_transport_err(
"Error - unknown OOB item type %d\n",
item->type);
break;
}
// Signal there is a log buffer available to
// any waiters.
data->log_start = _STP_D_T_LOG_INC(data->log_start);
pthread_mutex_lock(&(data->log_mutex));
pthread_cond_signal(&(data->log_space_avail));
pthread_mutex_unlock(&(data->log_mutex));
}
// Handle the non-OOB data.
for (size_t i = 0; i < q->items; i++) {
item = &(q->queue[i]);
switch (item->type) {
case STP_DYN_NORMAL_DATA:
_stp_transport_debug("STP_DYN_NORMAL_DATA"
" (%ld bytes at offset %ld)\n",
item->bytes, item->offset);
c = stp_session_context(item->data_index);
data = &c->transport_data;
read_ptr = (data->print_buf
+ _STP_D_T_PRINT_NORM(item->offset));
if (_stp_write_retry(out_fd, read_ptr, item->bytes) < 0)
_stp_transport_err(
"couldn't write %ld bytes data: %s\n",
(long)item->bytes, strerror(errno));
pthread_mutex_lock(&(data->print_mutex));
// Now we need to update the read
// pointer, using the data_index we
// received. Note that we're doing
// this with or without that context
// locked, but the print_mutex is
// locked.
data->read_offset = _STP_D_T_PRINT_ADD(item->offset, item->bytes);
// Signal more bytes available to any waiters.
pthread_cond_signal(&(data->print_space_avail));
pthread_mutex_unlock(&(data->print_mutex));
_stp_transport_debug(
"STP_DYN_NORMAL_DATA flushed,"
" read_offset %ld, write_offset %ld)\n",
data->read_offset, data->write_offset);
break;
case STP_DYN_EXIT:
_stp_transport_debug("STP_DYN_EXIT\n");
stopping = 1;
break;
case STP_DYN_REQUEST_EXIT:
_stp_transport_debug("STP_DYN_REQUEST_EXIT\n");
__stp_d_t_request_exit();
break;
default:
if (! (item->type & STP_DYN_OOB_DATA_MASK)) {
_stp_transport_err(
"Error - unknown item type"
" %d\n", item->type);
}
break;
}
}
// We're now finished with the read queue. Clear it
// out.
q->items = 0;
}
return NULL;
}
static int _stp_ctl_send(int type, void *data, unsigned len)
{
_stp_transport_debug("type 0x%x data %p len %d\n",
type, data, len);
// This thread should already have a context structure.
struct context* c = _stp_runtime_get_context();
if (c == NULL)
return EINVAL;
// Currently, we're only handling 'STP_SYSTEM' control
// messages, converting it to a STP_DYN_SYSTEM message.
if (type != STP_SYSTEM)
return 0;
char *buffer = _stp_dyninst_transport_log_buffer();
if (buffer == NULL)
return 0;
memcpy(buffer, data, len);
size_t offset = buffer - c->transport_data.log_buf;
__stp_dyninst_transport_queue_add(STP_DYN_SYSTEM,
c->data_index, offset, len);
return len;
}
static void _stp_dyninst_transport_signal_exit(void)
{
__stp_dyninst_transport_queue_add(STP_DYN_EXIT, 0, 0, 0);
}
static void _stp_dyninst_transport_request_exit(void)
{
__stp_dyninst_transport_queue_add(STP_DYN_REQUEST_EXIT, 0, 0, 0);
}
static int _stp_dyninst_transport_session_init(void)
{
int rc;
// Set up the transport session data.
struct _stp_transport_session_data *sess_data = stp_transport_data();
if (sess_data != NULL) {
rc = stp_pthread_mutex_init_shared(&(sess_data->queue_mutex));
if (rc != 0) {
_stp_error("transport queue mutex initialization"
" failed");
return rc;
}
rc = stp_pthread_cond_init_shared(&(sess_data->queue_space_avail));
if (rc != 0) {
_stp_error("transport queue space avail cond variable"
" initialization failed");
return rc;
}
rc = stp_pthread_cond_init_shared(&(sess_data->queue_data_avail));
if (rc != 0) {
_stp_error("transport queue empty cond variable"
" initialization failed");
return rc;
}
}
// Set up each context's transport data.
int i;
for_each_possible_cpu(i) {
struct context *c;
struct _stp_transport_context_data *data;
c = stp_session_context(i);
if (c == NULL)
continue;
data = &c->transport_data;
rc = stp_pthread_mutex_init_shared(&(data->print_mutex));
if (rc != 0) {
_stp_error("transport mutex initialization failed");
return rc;
}
rc = stp_pthread_cond_init_shared(&(data->print_space_avail));
if (rc != 0) {
_stp_error("transport cond variable initialization failed");
return rc;
}
rc = stp_pthread_mutex_init_shared(&(data->log_mutex));
if (rc != 0) {
_stp_error("transport log mutex initialization failed");
return rc;
}
rc = stp_pthread_cond_init_shared(&(data->log_space_avail));
if (rc != 0) {
_stp_error("transport log cond variable initialization failed");
return rc;
}
}
return 0;
}
static int _stp_dyninst_transport_session_start(void)
{
int rc;
// Start the thread.
rc = pthread_create(&_stp_transport_thread, NULL,
&_stp_dyninst_transport_thread_func, NULL);
if (rc != 0) {
_stp_error("transport thread creation failed (%d)", rc);
return rc;
}
_stp_transport_thread_started = 1;
return 0;
}
static int
_stp_dyninst_transport_write_oob_data(char *buffer, size_t bytes)
{
// This thread should already have a context structure.
struct context* c = _stp_runtime_get_context();
if (c == NULL)
return EINVAL;
size_t offset = buffer - c->transport_data.log_buf;
__stp_dyninst_transport_queue_add(STP_DYN_OOB_DATA,
c->data_index, offset, bytes);
return 0;
}
static int _stp_dyninst_transport_write(void)
{
// This thread should already have a context structure.
struct context* c = _stp_runtime_get_context();
if (c == NULL)
return 0;
struct _stp_transport_context_data *data = &c->transport_data;
size_t bytes = data->write_bytes;
if (bytes == 0)
return 0;
// This should be thread-safe without using any additional
// locking. This probe is the only one using this context and
// the transport thread (the consumer) only writes to
// 'read_offset'. Any concurrent-running probe will be using a
// different context.
_stp_transport_debug(
"read_offset %ld, write_offset %ld, write_bytes %ld\n",
data->read_offset, data->write_offset, data->write_bytes);
// Notice we're not normalizing 'write_offset'. The consumer
// thread needs "raw" offsets.
size_t saved_write_offset = data->write_offset;
data->write_bytes = 0;
// Note that if we're writing all remaining bytes in the
// buffer, it can wrap (but only to either "high" or "low"
// 0).
data->write_offset = _STP_D_T_PRINT_ADD(data->write_offset, bytes);
__stp_dyninst_transport_queue_add(STP_DYN_NORMAL_DATA,
c->data_index,
saved_write_offset, bytes);
return 0;
}
static void _stp_dyninst_transport_shutdown(void)
{
// If we started the thread, tear everything down.
if (_stp_transport_thread_started != 1) {
return;
}
// Signal the thread to stop.
_stp_dyninst_transport_signal_exit();
// Wait for thread to quit...
pthread_join(_stp_transport_thread, NULL);
_stp_transport_thread_started = 0;
// Tear down the transport session data.
struct _stp_transport_session_data *sess_data = stp_transport_data();
if (sess_data != NULL) {
pthread_mutex_destroy(&(sess_data->queue_mutex));
pthread_cond_destroy(&(sess_data->queue_space_avail));
pthread_cond_destroy(&(sess_data->queue_data_avail));
}
// Tear down each context's transport data.
int i;
for_each_possible_cpu(i) {
struct context *c;
struct _stp_transport_context_data *data;
c = stp_session_context(i);
if (c == NULL)
continue;
data = &c->transport_data;
pthread_mutex_destroy(&(data->print_mutex));
pthread_cond_destroy(&(data->print_space_avail));
pthread_mutex_destroy(&(data->log_mutex));
pthread_cond_destroy(&(data->log_space_avail));
}
}
static int
_stp_dyninst_transport_log_buffer_full(struct _stp_transport_context_data *data)
{
// This inverts the most significant bit of 'log_start' before
// comparison.
return (data->log_end == (data->log_start ^ _STP_LOG_BUF_ENTRIES));
}
static char *_stp_dyninst_transport_log_buffer(void)
{
// This thread should already have a context structure.
struct context* c = _stp_runtime_get_context();
if (c == NULL)
return NULL;
// Note that the context structure is locked, so only one
// probe at a time can be operating on it.
struct _stp_transport_context_data *data = &c->transport_data;
// If there isn't an available log buffer, wait.
if (_stp_dyninst_transport_log_buffer_full(data)) {
pthread_mutex_lock(&(data->log_mutex));
while (_stp_dyninst_transport_log_buffer_full(data)) {
pthread_cond_wait(&(data->log_space_avail),
&(data->log_mutex));
}
pthread_mutex_unlock(&(data->log_mutex));
}
// Note that we're taking 'log_end' and normalizing it to start
// at 0 to get the proper entry number. We then multiply it by
// STP_LOG_BUF_LEN to find the proper buffer offset.
//
// Every "allocation" here is done in STP_LOG_BUF_LEN-sized
// chunks.
char *ptr = &data->log_buf[_STP_D_T_LOG_NORM(data->log_end)
* STP_LOG_BUF_LEN];
// Increment 'log_end'.
data->log_end = _STP_D_T_LOG_INC(data->log_end);
return ptr;
}
static size_t
__stp_d_t_space_before(struct _stp_transport_context_data *data,
size_t read_offset)
{
// If the offsets have differing most significant bits, then
// the write offset has wrapped, so there isn't any available
// space before the write offset.
if ((read_offset & _STP_DYNINST_BUFFER_SIZE)
!= (data->write_offset & _STP_DYNINST_BUFFER_SIZE)) {
return 0;
}
return (_STP_D_T_PRINT_NORM(read_offset));
}
static size_t
__stp_d_t_space_after(struct _stp_transport_context_data *data,
size_t read_offset)
{
// We have to worry about wraparound here, in the case of a
// full buffer.
size_t write_end_offset = _STP_D_T_PRINT_ADD(data->write_offset,
data->write_bytes);
// If the offsets have differing most significant bits, then
// the write offset has wrapped, so the only available space
// after the write offset is between the (normalized) write
// offset and the (normalized) read offset.
if ((read_offset & _STP_DYNINST_BUFFER_SIZE)
!= (write_end_offset & _STP_DYNINST_BUFFER_SIZE)) {
return (_STP_D_T_PRINT_NORM(read_offset)
- _STP_D_T_PRINT_NORM(write_end_offset));
}
return (_STP_DYNINST_BUFFER_SIZE
- _STP_D_T_PRINT_NORM(write_end_offset));
}
static void *_stp_dyninst_transport_reserve_bytes(int numbytes)
{
void *ret;
// This thread should already have a context structure.
struct context* c = _stp_runtime_get_context();
if (c == NULL) {
_stp_transport_debug("NULL context!\n");
return NULL;
}
struct _stp_transport_context_data *data = &c->transport_data;
size_t space_before, space_after, read_offset;
recheck:
pthread_mutex_lock(&(data->print_mutex));
// If the buffer is empty, reset everything to the
// beginning. This cuts down on fragmentation.
if (data->write_bytes == 0 && data->read_offset == data->write_offset
&& data->read_offset != 0) {
data->read_offset = 0;
data->write_offset = 0;
}
// We cache the read_offset value to get a consistent view of
// the buffer (between calls to get the space before/after).
read_offset = data->read_offset;
pthread_mutex_unlock(&(data->print_mutex));
space_before = __stp_d_t_space_before(data, read_offset);
space_after = __stp_d_t_space_after(data, read_offset);
// If we don't have enough space, try to get more space by
// flushing and/or waiting.
if (space_before < numbytes && space_after < numbytes) {
// First, lock the mutex.
pthread_mutex_lock(&(data->print_mutex));
// There is a race condition here. We've checked for
// available free space, then locked the mutex. It is
// possible for more free space to have become
// available between the time we checked and the time
// we locked the mutex. Recheck the available free
// space.
read_offset = data->read_offset;
space_before = __stp_d_t_space_before(data, read_offset);
space_after = __stp_d_t_space_after(data, read_offset);
// If we still don't have enough space and we have
// data we haven't flushed, go ahead and flush to free
// up space.
if (space_before < numbytes && space_after < numbytes
&& data->write_bytes != 0) {
// Flush the buffer. We have to do this while
// the mutex is locked, so that we can't miss
// the condition change. (If we did flush
// without the mutex locked, it would be
// possible for the consumer thread to signal
// the condition variable before we were
// waiting on it.)
_stp_dyninst_transport_write();
// Mutex is locked. It is automatically
// unlocked while we are waiting.
pthread_cond_wait(&(data->print_space_avail),
&(data->print_mutex));
// Mutex is locked again.
// Recheck available free space.
read_offset = data->read_offset;
space_before = __stp_d_t_space_before(data,
read_offset);
space_after = __stp_d_t_space_after(data, read_offset);
}
// If we don't have enough bytes available, do a timed
// wait for more bytes to become available. This might
// fail if there isn't anything in the queue for this
// context structure.
if (space_before < numbytes && space_after < numbytes) {
_stp_transport_debug(
"waiting for more space, numbytes %d,"
" before %ld, after %ld\n",
numbytes, space_before, space_after);
// Setup a timeout for
// STP_DYNINST_TIMEOUT_SECS seconds into the
// future.
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += STP_DYNINST_TIMEOUT_SECS;
// Mutex is locked. It is automatically
// unlocked while we are waiting.
pthread_cond_timedwait(&(data->print_space_avail),
&(data->print_mutex),
&ts);
// When pthread_cond_timedwait() returns, the
// mutex has been (re)locked.
// Now see if we've got more bytes available.
read_offset = data->read_offset;
space_before = __stp_d_t_space_before(data,
read_offset);
space_after = __stp_d_t_space_after(data, read_offset);
}
// We're finished with the mutex.
pthread_mutex_unlock(&(data->print_mutex));
// If we *still* don't have enough space available,
// quit. We've done all we can do.
if (space_before < numbytes && space_after < numbytes) {
_stp_transport_debug(
"not enough space available,"
" numbytes %d, before %ld, after %ld,"
" read_offset %ld, write_offset %ld\n",
numbytes, space_before, space_after,
read_offset, data->write_offset);
return NULL;
}
}
// OK, now we have enough space, either before or after the
// current write offset.
//
// We prefer using the size after the current write, which
// will help keep writes contiguous.
if (space_after >= numbytes) {
ret = (data->print_buf
+ _STP_D_T_PRINT_NORM(data->write_offset)
+ data->write_bytes);
data->write_bytes += numbytes;
_stp_transport_debug(
"reserve %d bytes after, bytes available"
" (%ld, %ld) read_offset %ld, write_offset %ld,"
" write_bytes %ld\n",
numbytes, space_before, space_after, data->read_offset,
data->write_offset, data->write_bytes);
return ret;
}
// OK, now we know we need to use the space before the write
// offset. If we've got existing bytes that haven't been
// flushed, flush them now.
if (data->write_bytes != 0) {
_stp_dyninst_transport_write();
// Flushing the buffer updates the write_offset, which
// could have caused it to wrap. Start all over.
_stp_transport_debug(
"rechecking available bytes after a flush...\n");
goto recheck;
}
// Wrap the offset around by inverting the most significant
// bit, then clearing out the lower bits.
data->write_offset = ((data->write_offset ^ _STP_DYNINST_BUFFER_SIZE)
& _STP_DYNINST_BUFFER_SIZE);
ret = data->print_buf;
data->write_bytes += numbytes;
_stp_transport_debug(
"reserve %d bytes before, bytes available"
" (%ld, %ld) read_offset %ld, write_offset %ld,"
" write_bytes %ld\n",
numbytes, space_before, space_after, data->read_offset,
data->write_offset, data->write_bytes);
return ret;
}
static void _stp_dyninst_transport_unreserve_bytes(int numbytes)
{
// This thread should already have a context structure.
struct context* c = _stp_runtime_get_context();
if (c == NULL)
return;
struct _stp_transport_context_data *data = &c->transport_data;
if (unlikely(numbytes <= 0 || numbytes > data->write_bytes))
return;
data->write_bytes -= numbytes;
}
#endif /* _STAPDYN_TRANSPORT_C_ */