#include <CRingBuffer>
class CRingBuffer {
}
static void create(std::string name, size_t dataBytes = m_defaultDataSize, size_t maxConsumer = m_defaultMaxConsumers, bool tempConnection = false);
static CRingBuffer* createAndProduce(std::string name, size_t dataBytes = m_defaultDataSize, size_t maxConsumer = m_defaultMaxConsumers, bool tempConnection = false);
static void remove(std::string name);
static void format(std::string name, size_t maxConsumer = m_defaultMaxConsumers);
static bool isRing(std::string name);
static void setDefaultRingSize(size_t byteCount);
static size_t getDefaultRingSize();
static void setDefaultMaxConsumers(size_t numConsumers);
static size_t getDefaultMaxConsumers();
static std::string defaultRing();
static std::string defaultRing();
static std::string defaultRingUrl();
CRingBuffer(std::string name, ClientMode mode = CRingBuffer::consumer);
virtual ~CRingBuffer();
size_t put(void* pBuffer, size_t nBytes, unsigned long timeout = ULONG_MAX);
size_t get(void* pBuffer, size_t maxBytes, size_t minBytes = 1, unsigned long timeout = ULONG_MAX);
size_t peek(void* pBuffer, size_t maxbytes);
void skip(size_t nBytes);
unsigned long setPollInterval(unsigned long newValue);
unsigned long getPollInterval();
size_t availablePutSpace();
size_t availableData();
CRingBuffer::Usage getUsage();
int blockWhile(CRingBuffer::CRingBufferPredicate& pred, unsigned long timeout = ULONG_MAX);
void While(CRingBufferPredicate& pred);
off_t getSlot();
void forceProducerRelease();
void forceConsumerRelease(unsigned slot);
CRingBuffer
provides a low level message-based
high performance interprocessor communication mechanism called a
ring buffer. For information about ring buffers in general, see
http://en.wikipedia.org/wiki/Circular_buffer.
CRingBuffer
manages its ring buffer in
shared memory. The shared memory region includes the buffer a
single put pointer and several get pointers. This allows for a
single producer (in the NSCL DAQ, this would generally be a readout
program or event builder), and several consumers (e.g. data analysis,
event recording, scaler displays etc.).
The CRingBuffer
class include static methods
for creating and reinitializing ring buffers as well as
object based members that allow ring buffer clients (both producers
and consumers) to transfer data. For more on how to use
the ring buffer class see
The Ring Buffer Primitives
A unique feature of the CRingBuffer
class is predicate based blocking. This allows a caller to block
until some arbitrary condition is no longer true. Higher level
software that is aware of message structure can use this to select
specific messages from the ring, or implement message sampling.
For more information on that see the
description of blockWhile
below, and
the "Types and public data" section later in this reference page.
static void create(std::string name, size_t dataBytes = m_defaultDataSize, size_t maxConsumer = m_defaultMaxConsumers, bool tempConnection = false);
Creates a new ring buffer. Each ring buffer has a distinct name. Ring buffer names can be just about anything as long as there are no / characters embedded in them.
name
The name of the ring buffer. If you call this for a ring buffer that already exists, the ring buffer will be re-initialized. This has undesirable effects if clients are connected to the ring.
dataBytes
Determines the number of bytes of data that can exist
in the ring buffer without having been consumed and without
blocking another put
operation.
Note that due to the need to page align the shared memory
region, this value is a lower bound on the actual number
of bytes of storage.
This paramteer is optional and has a default value.
setDefaultRingSize
can set the
default value for this parameter, and
getDefaultRingSize
can provide the current value for this default.
maxConsumer
Provides the maximum number of consumers that can connect
to the ring simultaneously. This is an optional parameter.
The value this parameter defaults to can be modifed and
read using the
setDefaultMaxConsumers
and
getDefaultMaxConsumers
respectively.
tempConnection
If this is true, the ring buffer is registered with the Ring master using a temporary, one-time connection which is closed down immediately after registration. This allows children to be created that don't inherit an open socket if done early enough in the game.
Note that no restrictions are imposed on which users can read/write from/to the ring buffer. That is the ring buffer, which is encapsulated in a POSIX shared memory file, has the permissions mode 0666.
static CRingBuffer* createAndProduce(std::string name, size_t dataBytes = m_defaultDataSize, size_t maxConsumer = m_defaultMaxConsumers, bool tempConnection = false);
If the ringbuffer name
does not exist it is
created. The caller is then attached to the ring as a producer
and the CRingBuffer
object created is returned
to the caller. The return is a pointer to a dynamically allocated
CRingBuffer
which must be deleted by the
client.
Note that if the ring already exists, only the
name
parameter matters. The other parameters
are used for name creation and have the same meaning as for
create
static void remove(std::string name);
Marks the shared memory region assocaited with a ring buffer for deletion. Once no more processes are attached to the ring buffer, or have file descriptors open on the ring buffer, the shared memory region is deleted.
static void format(std::string name, size_t maxConsumer = m_defaultMaxConsumers);
Formats a ring buffer. Formatting a ring buffer empties it and resets all of the ring get/put pointers to not-owned. It is a very bad thing to format a ring buffer that is active.
name
Provides the name of the ring buffer. This must be the name of an existing ring buffer.
maxConsumer
Defines the maximum number of consumer programs that can connect
to the ring buffer. This is an optional parameter. The
default for this parameter can be gotten and set using
setDefaultMaxConsumers
and
getDefaultMaxConsumers
respectively.
Since
format
does not resize the shared memory
region containing the buffer and its control data, the size of the
data will change if the number of consumers is changed from the value
used to create the initial ring.
static bool isRing(std::string name);
Ring buffers are shared memory segments, but not all shared memory segments are rings. Ring buffers have signature bytes in their headers that allow software to unambiguously determine if a shared memory region is a ring buffer or something else.
This function returns true
If a ring named
name
exists. If it does not (either there's no corresponding shared
memory region or there is one, but it does not have the correct
signature bytes to qualify as a ring buffer), the function
returns
false
static void setDefaultRingSize(size_t byteCount);
Provides a new value, byteCount
for the default
ring data size. The function returns the prior default value.
static size_t getDefaultRingSize();
Returns the default size of the data region of a ring.
static void setDefaultMaxConsumers(size_t numConsumers);
Sets the default number of consumers supported by a new ring buffer
to numConsumers
.
static size_t getDefaultMaxConsumers();
Returns the default maximum number of consumers that will be supported by the creation of a new ring buffer.
static std::string defaultRing();
Returns the default name of the ring for most NSCLDAQ cases. This will be the name of the logged in user.
static std::string defaultRingUrl();
Returns a URL for the default ring for most NSCLDAQ applications.
If CRingBuffer::defaultRing
returns
user, this function would return:
tcp://localhost/user.
CRingBuffer(std::string name, ClientMode mode = CRingBuffer::consumer);
Creates a new ring buffer object. Ring buffer objects are used to
connect clients to ring buffers created via
CRingBuffer::create
.
A client can be a manager, a producer or a consumer.
Only one producer can be attached to the ring buffer at a time.
Several consumers and an unlimited number of managers
can attach simlutaneously.
The name
parameter is the name of the ring
buffer to which the object will be attached. This ring buffer
must have been created with the CRingBuffer::create
member function.
mode
determines the type of client connection
to be formed. This is an optional parameter that defaults to
CRingBuffer::consumer, which creates a consumer
attachment. The value
CRingBuffer::producer
creates a producer attachment.
CRingBuffer::manager
creates a manager attachment.
virtual ~CRingBuffer();
Destroys a ring buffer attachment object. The object's destruction results in a release of the access pointer associated with the ring buffer. If the object was a producer attachment, a new producer can attach. IF the object was a consumer attachment, that consumer pointer becomes available for a new consumer.
size_t put(void* pBuffer, size_t nBytes, unsigned long timeout = ULONG_MAX);
Puts data into the ring buffer. The object must hold a producer attachment.
pBuffer
Pointer to the data to be transferred to the ring buffer.
nBytes
The number of bytes to transfer to the ring buffer from the buffer.
timeout
The maximum number of seconds to wait for free space to
be available in the ring buffer prior to failing the
put
operation. This is an optional parameter that defaults to
essentially forever (the largest value that can fit in a longword,
in a 32 bit longword this is about 136 years.
The return value is the number of bytes actually written. This should
either be the value of nBytes
, indicating the
entire bufer was transferred to the ring buffer or 0 indicating that
a wait for free space timed out.
size_t get(void* pBuffer, size_t maxBytes, size_t minBytes = 1, unsigned long timeout = ULONG_MAX);
Retrieves a block of data from the ring buffer. This function allows the caller to specify both a maximum, and a minimum size of the data to return.
pBuffer
Points to the storage in which the data from the ring buffer will be stored.
maxBytes
Specifies the maximum number of bytes that can be transferred to the buffer (this is usually, but not always, the size of the buffer).
minBytes
Specifies the smallest message that can be transferred to
the caller's buffer. The function will block until
minBytes
of data is available for this
consumer or until the timeout.
timeout
Determines the maximum number of seconds the function will
block for minBytes
to be available.
This is an optional parameter and defaults to essentially
forever (in the limit as 136 years approaches forever).
The function returns the number of bytes actually read. The value
0 implies the read timed out, or that
minBytes
was 0 and no
bytes were available.
size_t peek(void* pBuffer, size_t maxbytes);
Returns data from the ring buffer without actually removing it. This function never blocks.
pBuffer
A pointer to a buffer that will hold the data retrieved from the ring.
maxbytes
Returns the maximum number of bytes that can be transferred to the buffer.
The return value from this function is the actual number of bytes
transferred. Since this function never blocks. This can be
any value from 0 though maxbytes
.
void skip(size_t nBytes);
Skips the consumer client's get pointer forward by nBytes
,
wrapping as neeed. peek
and Skip
can be used to conditionally read data from the ring. For example, a peek
at the front of the available data could determine the size and type of
a message. The message could then either be read with get
,
or skipped over with skip
.
unsigned long setPollInterval(unsigned long newValue);
The
blockWhile
function polls a predicate blocking
between polls for some polling interval.
setPollIntervale
allows the caller to set the
pollinterval for blocking operations on a ring.
newValue
is the number of milliseconds between polls.
The return value is the previous value of the polling interval.
unsigned long getPollInterval();
See setPollInterval
above.
This method returns the current polling interval.
size_t availablePutSpace();
Returns the number of bytes that can currently be put in the buffer without the producer blocking.
size_t availableData();
Returns the amount of data available to this consumer.
CRingBuffer::Usage getUsage();
Returns a number of pieces of status and statistics information about
the ring buffer. See "Types and public data" below for more information
about the CRingBuffer::USage
structure.
int blockWhile(CRingBuffer::CRingBufferPredicate& pred, unsigned long timeout = ULONG_MAX);
Blocks the caller by calling the operator()
method of the pred
parameter. Blocking continues
as long as this predicate call returns true or until the
timeout
seconds have passed.
timeout
is an optional parameter that defaults to
136 years (or essentially forever).
void While(CRingBufferPredicate& pred);
Repeatedly calls the predicate pred
until it returns
false. The method polls the predicate rather than
blocking between calls. One use of this function is to skip forward
through the ring buffer until a message that matches some specific
pattern is encountered.
off_t getSlot();
Returns the consumer slot that corresponds to the object. If the object is a producer or a manager object, the return value is -1.
void forceProducerRelease();
Forces the ring to no longer have a producer attachment. This is intended to remove producers that have exited abnormally.
void forceConsumerRelease(unsigned slot);
Forces the consumer slot slot
to be freee for
reallocation. This is intended to be used to remove consumers that
have exited abnormally.
The
CRingBuffer
class defines several nested
data types:
This is an enumerated type that is used to select the type
of connection desired when creating an instance of a
CRingBuffer
object.
Possible values are:
Requests a producer connection to the ring buffer.
Requests a consumer connection to the ring buffer.
Requests a manager connection to the ring buffer. normal user code should not do this. This sort of connection allows force disconnection of existing connection.
This structure is used to pass information about the status of
a ring from the getUsage
method. The
struct contains the following members:
s_bufferSpace
Contains the size of the ring buffer in bytes.
s_putSpace
Contains the number of bytes the producer could put in the buffer without blocking.
s_maxConsumers
Contains the number of consumers that can be simultaneously connected to the ring buffer.
s_producer
Contains the process id of the ring buffer.
s_maxGetSpace
Contains the number of bytes that could be retrieved by the consumer that is furthest behind in reading the buffer.
s_minGetSpace
Returns the number of bytes that could be retrieved by the consumer that's most up to date in reading the buffer.
s_consumers
This vector returns information about each client currently attached to the ring buffer. The first element of each pair is the process id of the consumer. The second element of each pair is the number of bytes that consumer could get without blocking.
CRingBuffer::CRingBufferPredicate
provides an abstract
baseclass taht defines an interface for blocking predicate.
A blocking predicate is a function like object that can be used
by blockWhile
to know how long to block.
The
CRingBuffer::CRingBufferPredicate
class defines only a single method:
bool operator()(virtual CRingBuffer& ring);
On each pass through its loop, the method
blockWhile
invokes its predicates
operator()
and continues blocking if
that method returns true.
Some of the member functions in
CRingBuffer
will throw exceptions:
CErrnoException
Will be thrown when a system call returns an error in a
system call that provides additional information in
the errno
variable.
CRangeException
Thrown when some value is not in a legal range. For
example, a user is requesting a get with
minbytes
larger than the
total number of bytes the ring buffer can hold.
Rather than blocking forever, this exception is thrown.
CStateException
Normally thrown if a consumer is attempting functions that are only allowed to producers, and vica versa.
The example below shows how to create a predicate that filters
messages. We assume that each messages has a two longword header
that is defined in the struct header
.
s_size
is the size of the message,
including the header, and type
is
the message type code.
The example predicate allows the application to skip all messages that don't match the desired message type.
Example 1. Message filter predicate
class CMessageFilterPredicate : public CRingBuffer::CRingBufferPredicate { private: long m_desiredType; public: CMessageFilterPredicate(long desiredType) : m_desiredType(desiredType) {} virtual bool operator()(CRingBuffer& ring); }; bool CMessageFilterPredicate::operator()(CRingBuffer& ring) { // peek the header: struct header hdr; size_t numRead = ring.peek(&hdr, sizeof(hdr)); if(numRead != sizeof(hdr)) return true; if (hdr.s_type != m_desiredType) { ring.skip(hdr.s_size); return true; } return false; } ... CRingBuffer myring(string("myring")); ... // Wait for a message of type 3... // then read it. CMessageFilterPredicate filter(3); myring.blockWhile(filter); struct header &hdr; myring.peek(&hdr, sizeof(hdr)); char* message = new message[hdr.s_size]; myring.get(message, hdr.s_size, hdr.s_size); (11) ...
CMessageFilterPredicate
that is derived from the
CRingBuffer::CRingPRedicate
base class.
Objects that are CMessageFilterPredicate
objects can therefore be passed as predicates to the
CRingBuffer::blockWhile
method.
CMessageFilterPredicate
will save
the desired messsage type in member data it can be used
to compare the types of messages in the ring buffer to
the desired message type.
CRingBuffer::CRingBufferPredicate
class must implement the
operator()
.
This method is called by the polling loop in
CRingBuffer::blockWhile
.
CRingBuffer::blockWhile
continue
blocking.
put
operation. If the header of
shows that the message type is not the one we're looking for,
the entire message is skipped and
CRingBuffer::blockWhile
is told to
continue to block.
CRingBuffer::blockWhile
to stop
blocking.
CMessageFilterPredicate
object passing the constructor the message type code
we want to see.
CRingBuffer::blockWhile
will not
return until a message of type 3
is found. Due to the
skip
calls in the predicate,
when control returns, the next available data in the
ring buffer is the desired message.