This program provides the ability to edit the bodies of fragments of event built data. The body of a fragment is, in this case, considered to be the body of the ring item rather than the full body which contains the ring item header and optional body hedaer.
The program recomputes the size of the body and properly updates both the ring item header and the fragment header size fields prior to outputting the event.
The program relies on user code located in a shared library whose path is passed to the program at run-time.
--help
Outputs a brief summary of the program options
and exits. All other options are ignored.
See --version
below, however.
--version
Prints out the program name and version. All
other options are ignored. If
--help
and
--version
are both specified,
only the first of those optiones is acted on and
all other options are ignored.
--source
URISpecifies the data source. The data source can specify a ring buffer (local or remote) or a file. This option is mandatory.
--sink
URISpecifies the data sink, that is where the edited data are written. The URI can specify either a file or a ring buffer. If a ring buffer is specified, the host name must either be localhost or resolve to the local computer.
This option is mandatory.
--workers
integerSpecifies the number of worker threads/processes to start. Workers are handed chunks of events to operate on in parallel. In addition to the workers there are three more threads/processes:
An input thread/process reads data from the data source and distributes chunks of complete ring items to workers on demand. This distribution of events is load balanced and uses a pull application level protocol.
The workers send data to a sorting thread/process which re-orders the data by increasing time-stamp.
The sort thread/process sends data to an ouptut thread/process which writes data chunks to the data sink.
Thus processing is a four stage pipeline. Each stage running in parallel while the second stage consists of a farm of workers which work in parallel on segments (clumps) of input data to produce the output data.
This option is mandatory.
--clump-size
integerThe value of this option is the number of ring items each worker gets in response to a data pull. Larger clump sizes result in better amortizatino of the communications overhead. Smaller clumps result in better responsiveness at the program's output since the sorter needs to see contributions from all workers before it can sort.
This option is optional and defaults to 1 which isn't good for most cases.
--parallel-strategy
threaded | mpiSpecifies which parallelization library to use. Threaded parallelism runs all processing elements in a single machine and is suitable for use in high core count systems. MPI parallelism uses OpenMPI for messaging and, using mpirun to start the program, allows you to distribute the computation across several networked nodes.
When using mpi parallelism, the number of
processes specified in the mpirun -np
flag must be at least 4. The actual number of
workers is computed from -np
rather than --workers
and is
--np
- 3. If this value is
not consistent with the value specified by
--workers
a warning is emitted
to the stderr of the rank 0 process.
This option is optional and defaults to threaded.
--editorlib
file-pathSpecifes the path of the shared library that contains the user code that edits the event. See USER CODE below for more information as well as EXAMPLE.
Note that if the library is not in the default library load path, you will need to ensure that your path specification includes enough information to describe the directory e.g. ./libMylib.so as opposed to libMylib.so.
User code is built into a shared library that must provide:
Implementation of a class derived from
CBuiltRingItemEditor::BodyEditor
.
See CBuiltRingItemEditor.h.
A factory function with C bindings named
createEditor
which is called with no parameters and is expected
to return a pointer to a new, dynamically created
CBuiltRingItemEditor::BodyEditor
object (or an object from a class derived from that).
A key data structure that we will be using is a CBuiltRingItemEditor::BodySegment. Objects of this type are used to describe chunks of the output event body. This structure allows in-place editing with minimum data movements. That is body segments can be chunks of the original event as well as new data.
The CBuiltRingItemEditor::BodySegment is a struct with the following fields:
s_isDynamic
Specifies if the data described requires deletion once the event has been emitted. The worker process will operate on all events in a chunk before emitting the edited events in that chunk. If you are adding data to an event that is unique to that event that data must be dynamically allocated and this field in its descriptor must be true.
s_description
Describes a chunk of data. This is a struct
as well described in the man pages for
writev
. It has a
iov_base
which points
to the data in the chunk and a
iov_len
which contains
the number of bytes in that chunk.
The job of the editing class is to produce a vector of CBuiltRingItemEditor::BodySegment objects that describe the data in the edited body.
Let's look at the CBuiltRingItemEditor::BodyEditor
class definition and its methods.
class BodyEditor { public: virtual std::vector<BodySegment> operator()( pRingItemHeader pHdr, pBodyHeader hdr, size_t bodySize, void* pBody ) = 0; virtual void free(iovec& item) = 0; };
Note that the pRingItemHeader and pBodyHeader types are defined in DataFormat.h and described in the reference section 3daq.
operator()
is called for each
fragment of each event. pHdr
points to the ring item header of that fragment
(fragments are assumed to be ring items).
hdr
points to the body header
of the event. This program requires that event fragments
have body headers (as pretty much all do now).
Finally bodySize
are the
number of bytes in the body of the fragment ring item
and pBody
points to that body
operator()
returns a vector of
body segment descriptors that describe what the new
body will look like. The method normally should not
modify the ring item and body headers. The caller
will perform any modifications needed for the new
size of the body. Here's an example of code that
returns the original body un-modified:
... { std::vector<BodySegment> result; BodySegment body(bodySize, pBody); result.push_back(body); return result; }
This is a valid though uninteresting example.
When producing data to go in the body, that data must typically be dynamically allocated. This is because the entire clump of ring items, and fragments in each event ring item are processed before passing the resulting data to the sorting thread/process. Here's sample code to insert a uint32_t 0xa5a5a5a5 at the beginning of the fragment and a 0x5a5a5a5a after the existing body:
{ std::vector<BodySegment> result; uint32_t* pHeader = new uint32_t(0xa5a5a5a5); uint32_t* pTrailer= new uint32_t(0x5a5a5a5a); BodySegment descHeader(sizeof(uint32_t), pHeader, true); BodySegment body(bodySize, pBody); BodySegment descTrailer(sizeof(uint32_t), pTrailer, true); result.push_back(descHeader); result.push_back(body); result.push_back(descTrailer); return result; }
The key point to get here is that it's not necessary to
open up space for the header by sliding the original data
down or any other data movement. Just create the data for
the header, and trailer and provide descriptors.
Note that the is_isDynamic
field
for those items is true.
After events have been emitted, the event editor
goes through the set of descriptors for that event and
invokes free
for each dynamically
allocated segment. This method is passed the
s_description
field of that description.
That method should dispose of the dynamic storage associated with
that descriptor. For our previous example:
Let's put together the code in the previous section that inserts headers and trailers in each fragment into a complete example.
#include <CBuiltRingItemEditor.h>#include <DataFormat.h> class MyEditor : public CBuiltRingItemEditor::BodyEditor { virtual std::vector<BodySegment> operator()( pRingItemHeader pHdr, pBodyHeader hdr,
size_t bodySize, void* pBody ); virtual void free(iovec& item); }; virtual std::vector<BodySegment> MyEditor::operator()(
pRingItemHeader pHdr, pBodyHeader hdr, size_t bodySize, void* pBody ) { std::vector<BodySegment> result;
uint32_t* pHeader = new uint32_t(0xa5a5a5a5);
uint32_t* pTrailer = new uint32_t(0x5a5a5a5a); BodySegment hdr(sizeof(uint32_t), pHeader, true); BodySegment body(bodySize, pBody);
BodySegment trailer(sizeof(uint32_t), pTrailer, true); result.push_back(hdr); result.push_back(body);
result.push_back(trailer); return result; } void MyEditor::free(iovec& item)
{ uint32_t* p = static_cast<uint32_t>(item.iov_base); delete p; } exern "C" { CBuiltRingItemEditor::BodyEditor* createEditor() { return new MyEditor;
} }
MyEditor
below.
CBuiltRingItemEitor::BodyEditor
and promises to supply implementation of its base class's
pure virtual methods.
free
is called for them). Therefore
even though they are simple uin32_t objects they must be
new'd into existence and initialized.
free
is invoked for each header and trailer. We cast the pointer
to the data to a uint32_t pointer and delete.
Note that if you might have had several types of data
you're adding to the event, you may need to use
iov_len
to untangle which to
delete - or better yet, make them all derive from a common
base class with virtual destructors. You can then cast
to that base class and delete.
createEditor
provides that. When the shared library is loaded, this
entry point is located and invoked to return Body Editor
pointers to brand new body editors.