Skip to end of metadata
Go to start of metadata

MI Dataset Agent Architecture Overview

Dataset Agents are mapped so that there is 1 dataset agent per instrument. The agent needs enough information to load appropriate tools to find and parse all of a particular instruments data from a dataset. Engineering values for individual platforms, profilers, moorings, etc. will each have their own dataset agent gathering data.

Dataset inputs (piles of files or other streams of data) can be rather involved with several directories and files to be sorted through for both science and status/engineering information. Timing sequences and various data types may be combined or separate depending on the needs of the platform or instrument. In order to deal with this variety in data structure, the data needs to be gathered in a coherent, yet flexible way.

The core concepts of dataset logic is that there is a pile of data that needs to be ingested, parsed through, and split out via ION publishes for ultimate inclusion in the DM system, as well as near-real-time handling in the SA system. In order to do this mapping from stored or streamed data to published packets, the MI dataset agent architecture relies on:

  • Agents: Dataset Agents are specific sub-types of Resource Agent that handle basic control of a dataset ingestion (mostly configuring, starting, and stopping), along with the actual publishing of the ingested values into the ION system.
  • Harvesters: These are logical entities that find and gather the correct source of data for ingestion, while also tracking what data has already been ingested. They can find files, walk through directories, take in streams, open URIs, etc.
  • Parsers: These are logical entities that know how to understand what format the presented (by the harvester) data is in and where it needs to go to get into the dataset-specific particle format. Parsers assume the information they are getting has already been verified to be data that is needed. They use what the harvesters have supplied, trusting that the harvesters have chosen the right file in the sequence. The general logic for parsers is generally something like:
    • Get a block of data from the stream
    • Get timestamp from stream and set it
    • Parse the sample
    • Build the particle
    • Returned the built particle to the agent
    • Update position/state record
  • Drivers: The drivers tie the harvesters and parsers together. The driver is responsible for connecting the right harvester to the right parser to get the stream of raw data parsed, then back to the agent where it is published. The drivers are also responsible for making sure the harvester and parser position are recorded for the last published data point.

Parsers may choose to load blocks of data from a stream (perhaps for efficiency or limit working memory usage). If they do, they will need to handle the possibility of getting fragments or duplicate records. Using the chunker is common as this is what the chunker is designed to do. However, parsers may choose to load records individually, doing the sort of behavior of the chunker as characters come in. Which approach is used may depend on the parser's needs or dataset layout.

Regardless of how data is gathered, it is very important that the parser, harvester, and agent all work together to keep track of what data value
was last published, where it came from in the dataset, and what other state might be associated with that data value (probably a timestamp at least). By tracking this information correctly. data wont easily be re-published into the system.

Code directory structure:

  • mi/dataset – base classes and common code for dataset drivers, parsers, and harvesters
  • mi/dataset/test – test modules for base classes
  • mi/dataset/parser – A place for parser code that can be used by drivers working with specific sites, nodes, or data sources. Module name may indicate the entity that the parser works with. Multiple similar parsers may be included in these modules if appropriate.
  • mi/dataset/driver – site-specific driver code and configuration (ie a particular mooring type), largely metadata/status/engineering parsers and harvesters here. Files related to the type may be here, plus some tweaks for individual instances of that platform.
  • mi/dataset/driver/<path to individual driver>/resource - a place for resources (test data files, expected results files) used in the unit tests
  • mi/dataset/driver/<path to individual driver>/test - a place for unit tests for that driver

Configuration

Datasets should be fairly consistent, largely due to the consistent configuration of instruments on nodes attached to platforms. Defining the parsing of a dataset should largely be about specifying what instrument data sources are where (in the file hierarchy) and what they look like. Configuration may be done as a definition of a structure in a source file since the configuration should be fairly static. If dynamic configuration is needed outside of source, the config could be fed in from a YAML file (although the code must exist to read it if this is to happen). In general, the config should largely be a matter of specifying which parsers to load and how they should be fed their data. An example of such a configuration for a CTD on a HYPM platform might look like:

The configuration for the surface piercing profiler engineering data might look very similar such as:

Note the following about the configuration above:

  • The SPP pressure instrument has logic specific to this installation of SPP (see the module). It may pull that logic out by subclassing something in the mi.dataset.instrument hierarchy if it is similar enough and worthwhile enough. It also apparently needs a special format (see the dataset_format flag).
  • The CTD could be pretty generic with the bulk of the logic (everything but the particles, really) being inherited from a base parser (probably one for binary values, regexes, CSV, NMEA, etc. formats). Different base parsers are not currently written as of 8/2013.

The agent/versioning split

Because the driver code will be loaded from an egg into the current running OOIN container we need a mechanism to avoid namespace collision so that multiple versions of driver code can run concurrently.  We achieve this behavior by creating a unique namespace when the egg is created using a concatenation of the driver name and version number.  Therefore, when loading the driver class you must also include this unique namespace in the module name.  For instance when loading the ctdpf driver designed for the hypm spp:

  • Egg name: driver_hypm_ctd_0_0_4-0.0.4-py2.7.egg
  • Module: driver_hypm_ctd_0_0_4.mi.dataset.driver.hypm.ctd.driver
  • Class: HypmCTDPFDataSetDriver

When the egg package is created the dependent files are staged with non-python resource files.  Then all staged python files are automatically updated with the new namespace to ensure the proper version of the module is imported. 

Code layout:

Agent has an entity called a dataset driver that handles the integration of the harvester and parser to load and parse each data in the right way. The driver runs the control loop to do the following: Get next off Harvester, feed to parsers, callback to DSAgent for publishing, stash the last successfully processed position with the agent.

Error formatting macro: plantuml: java.lang.IllegalStateException

class DatasetParser << abstract >>
note top of DatasetParser
The DatasetParser lives in the MI world and handles the breaking down of
data streams into data particles. It should be customized so that it understands
the data formats it is encountering enough that it can build a particle. This may
happen through a somewhat generic parser with configuration, or a more custom
parser itself. When the end of the file is reached, driver starts a new parser with a
filehandle from the harvester.
end note
DatasetParser <|-- CtdFormatAParser
class DataParser {
Chunker chunker_if_needed
_init_(config, stream, data_state, data_state_callback, publish_callback)
get_records(max_count)
set_state(state_obj)
}
class CtdFormatAParser<? extends DatasetParser>{
}

InstrumentAgent <|-- DataSetAgent
class DataSetAgent {
_init_(config)
..
publish_callback(particle)
stash_data_position(data_location_struct)
set_configuration()
..
_load_config():config_dict
DataSetDriver
}
note top of DataSetAgent
The DataSetAgent lives in the COI world and handles ION interactions
with system configuration, messaging, archiving, and publishing.
It is responsible for instantiating the appropriate DSOrganizer class
and accepting publish messages from it when something needs to be published.
As an ION process, the agent must be responsible for keeping track of what
data has already been processed.
end note

class Harvester << abstract >> {
connect_to_source():filehandle
}
note top of Harvester
The harvester lives in the MI world and handles the ingestion of data from
whatever source there is. It may be files, URLs, network sockets, etc.
The idea is that it takes data from the source and makes it available as a stream.
In the process, it keeps track of what data files were last opened.
end note
Harvester <|-- CustomDataHarvester
class CustomDataHarvester<? extends Harvester>{
}
Harvester <|-- IteratingFilenameHarvester
class IteratingFilenameHarvester<? extends Harvester>{
}
Harvester <|-- StreamedDataHarvester
class StreamedDataHarvester<? extends Harvester>{
}

class DataSetDriver << abstract >> {
Parser parsers[]
Harvester harvesters[]
_init_(driver_config, stashed_position_struct, position_callback, publish_callback)
start_sampling(position_struct)
stop_sampling()
_harvester_callbacks()
_harvester_errbacks()
_parser_callbacks()
_parser_errbacks()
}
note top of DataSetDriver
The DataSetDriver lives in the MI world and is responsible for coordinating
the activities of the Harvester bringing in data and the parser that is
generating the output data. It should contain concrete harvester and
parser instances. This class may be a rather generic driver that
does a read off the exposed file handle(s) from the contained Harvesters,
pushes it to a parser, then sends the result to the parent agent for publishing.
The driver may also be very specific and know exactly how to handle a particular
platforms's handlers and parsers. It could even post-process whatever comes
out of the parser if it really needed to.

By having the driver, there is an umbrella of organization than can allow
multiple harvesters to do work together. Ie integrate a time or GPS file with a
data file.

This class can also aggregate the harvester and parser states so that the last
good data read can be stored in the ION system with this particular resource
agent.
end note
DataSetAgent *-- DataSetDriver
DataSetDriver *-- Harvester : 1..n
DataSetDriver *-- DataParser : 1..n
DataSetDriver <|-- PlatformXyzDriver
class PlatformXyzDriver<? extends DataSetDriver>{
}

Where the bundled egg would look like the following:

Execution Threads

Dataset agents are started in the container using the same mechanism as any other resource agent.  The agent then dynamically loads a dataset driver plugin.  A driver will contain one or more harvesters and each harvester will spawn and manage their own thread.  The driver will also launch a publisher thread where all parsing and particle publishing will occur.  While the driver manages the thread processing and polling the driver writer is responsible for overloading the _poll() method to actually do the work in the thread.

Exception Handling

Main Dataset Agent Command Thread

Exceptions that occur in this thread are likely software bugs and are not expected error conditions or configuration errors.  In either case the DSA execution will stop. 

Harvesters

All harvesters are passed a callback which is used when an exception is detected in their thread.  This is treated as a connection loss and we kick into our lost connection routine. The agent will transition to a lost connection state and we will begin a progressive retry scheme where the retry time is calculated using a constant value multiplied by a value returned from a factorial sequence with a 60 minute maximum.  

Retry Scheme

We will calculate retry times using a factorial algorithm capping at 60 minutes.  The result is we will retry at 1, 1, 2, 6, 24 minutes and if we haven't successfully restarted then return every 60 minutes until the agent is shutdown OR the issue is corrected.  Currently our constant is 20, so our retry times in seconds are:

20, 20, 40, 120, 480, 2400, 3600, 3600, ...

It is likely that we will transition out of the lost connection state successfully, but the error condition hasn't been cleared.  This is because we can't detect a correction until the driver is started up again which can only happen in a streaming state.  Therefore we will only clear the retry counter once we have not received an exception in double the current retry time.  If we have retried three times then the retry counter will not be reset until we have gone 80 seconds without an exception detected.

Alerts

When a harvester exception is detected a ResourceAgentLostConnectionError alert is published and the message is the serialized version of the exception.  The exception is also logged with exec information as an error.

Publisher Thread

When the driver object is created the agent passes a callback for publishing data which is passed through to the parsers.  When an exception occurs in this callback the exception is trapped and the particle to be published is ignored.  The agent connection id is also reset to ensure we can re-insert any missed samples if the exception was due to malformed data.

Alerts

An alert is published, ResourceAgentSampleError, and the exception is logged.

Sequencing Data

Sequencing data is no longer needed, since ION can now handle out of order data.  

Work still needed

Enter labels to add to this page:
Please wait 
Looking for a label? Just start typing.
  1. Aug 05, 2013

    Bill French says:

    Reference to egg packaging work Edward did. https://confluence.oceanobservatori...