Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sctask0123222 aggregate error handling #13

Merged
merged 6 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conf/config.xml.example
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
<port>5672</port>
<pending-queue>timeseries_pending_aggregate</pending-queue>
<finished-queue>timeseries_finished_aggregate</finished-queue>
<failed-queue>timeseries_failed_aggregate</failed-queue>
</rabbit>

</config>
2 changes: 1 addition & 1 deletion grnoc-tsds-aggregate.spec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Summary: GRNOC TSDS Aggregate
Name: grnoc-tsds-aggregate
Version: 1.2.1
Version: 1.2.2
Release: 1%{?dist}
License: GRNOC
Group: Measurement
Expand Down
2 changes: 1 addition & 1 deletion lib/GRNOC/TSDS/Aggregate.pm
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ package GRNOC::TSDS::Aggregate;
use strict;
use warnings;

our $VERSION = "1.2.1";
our $VERSION = "1.2.2";

1;
255 changes: 148 additions & 107 deletions lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use constant QUEUE_FETCH_TIMEOUT => 10 * 1000;
use constant RECONNECT_TIMEOUT => 10;
use constant PENDING_QUEUE_CHANNEL => 1;
use constant FINISHED_QUEUE_CHANNEL => 2;
use constant FAILED_QUEUE_CHANNEL => 3;
use constant SERVICE_CACHE_FILE => '/etc/grnoc/name-service-cacher/name-service.xml';
use constant COOKIES_FILE => '/var/lib/grnoc/tsds/aggregate/cookies.dat';

Expand Down Expand Up @@ -322,7 +323,19 @@ sub _consume_messages {

try {

$self->_aggregate_messages( $aggregates_to_process ) if ( @$aggregates_to_process > 0 );
my $results = $self->_aggregate_messages( $aggregates_to_process ) if ( @$aggregates_to_process > 0 );

# push any failed messages to the failed queue
my $failed_messages = $results->{'failed_messages'};
if (@$failed_messages > 0) {
$self->logger->error( "Failed to aggregate " . @$failed_messages . " messages.");
$self->rabbit->publish(
FAILED_QUEUE_CHANNEL,
$self->config->get( '/config/rabbit/failed-queue' ),
$self->json->encode( \@$failed_messages ),
{'exchange' => ''}
);
}
}

catch {
Expand All @@ -339,141 +352,162 @@ sub _aggregate_messages {
my ( $self, $messages ) = @_;

my $finished_messages = [];
my $results = {'failed_messages' => []};

foreach my $message ( @$messages ) {

my $type = $message->type;
my $from = $message->interval_from;
my $to = $message->interval_to;
my $start = $message->start;
my $end = $message->end;
my $meta = $message->meta;
my $values = $message->values;
my $required_meta = $message->required_meta;

# align to aggregation window we're getting data for
$start = nlowmult( $to, $start );
$end = nhimult( $to, $end );

my $min_max_mappings = $self->_get_min_max_mappings( required_meta => $required_meta,
meta => $meta );

my $hist_mappings = $self->_get_histogram_mappings( $values );

# craft the query needed to fetch the data from the necessary interval
my $from_clause = "from $type";
my $values_clause = $self->_get_values_clause( from => $from, values => $values, required_meta => $required_meta );
my $between_clause = $self->_get_between_clause( start => $start, end => $end, to => $to );
my $where_clause = $self->_get_where_clause( $meta );
my $by_clause = $self->_get_by_clause( $required_meta );
my $query = "$values_clause $between_clause $by_clause $from_clause $where_clause";

# issue the query to the webservice to retrieve the data we need to aggregate
$self->websvc->set_raw_output(1);
my $results = $self->websvc->query( query => $query,
output => 'bson');

# handle any errors attempting to query the webservice
if ( !$results || $self->websvc->get_error() ) {

die( "Error querying TSDS web service: " . $self->websvc->get_error() );
}
try {
my $type = $message->type;
my $from = $message->interval_from;
my $to = $message->interval_to;
my $start = $message->start;
my $end = $message->end;
my $meta = $message->meta;
my $values = $message->values;
my $required_meta = $message->required_meta;

# align to aggregation window we're getting data for
$start = nlowmult( $to, $start );
$end = nhimult( $to, $end );

my $min_max_mappings = $self->_get_min_max_mappings( required_meta => $required_meta,
meta => $meta );

my $hist_mappings = $self->_get_histogram_mappings( $values );

# craft the query needed to fetch the data from the necessary interval
my $from_clause = "from $type";
my $values_clause = $self->_get_values_clause( from => $from, values => $values, required_meta => $required_meta );
my $between_clause = $self->_get_between_clause( start => $start, end => $end, to => $to );
my $where_clause = $self->_get_where_clause( $meta );
my $by_clause = $self->_get_by_clause( $required_meta );
my $query = "$values_clause $between_clause $by_clause $from_clause $where_clause";

# issue the query to the webservice to retrieve the data we need to aggregate
$self->websvc->set_raw_output(1);
my $results = $self->websvc->query( query => $query,
output => 'bson');

# handle any errors attempting to query the webservice
if ( !$results || $self->websvc->get_error() ) {

die( "Error querying TSDS web service: " . $self->websvc->get_error() );
}

$results = MongoDB::BSON->new()->decode_one($results);
$results = MongoDB::BSON->new()->decode_one($results);

if ( $results->{'error'} ) {
if ( $results->{'error'} ) {

die( "Error retrieving data from TSDS: " . $results->{'error_text'} );
}
die( "Error retrieving data from TSDS: " . $results->{'error_text'} );
}

$results = $results->{'results'};
$results = $results->{'results'};

my $buckets = {};
my $meta_info = {};
my $buckets = {};
my $meta_info = {};

foreach my $result ( @$results ) {
foreach my $result ( @$results ) {

my @value_types = keys( %$result );
my $meta_data = {};
my @meta_keys;
my @value_types = keys( %$result );
my $meta_data = {};
my @meta_keys;

# the required fields are not one of the possible value types
# we're also going to omit anything that came back as a result of
# aggregation
foreach my $required ( @$required_meta ) {
# the required fields are not one of the possible value types
# we're also going to omit anything that came back as a result of
# aggregation
foreach my $required ( @$required_meta ) {

@value_types = grep { $_ ne $required && $_ !~ /__(min|max|hist)$/ } @value_types;
$meta_data->{$required} = $result->{$required};
push( @meta_keys, $result->{$required} );
}
@value_types = grep { $_ ne $required && $_ !~ /__(min|max|hist)$/ } @value_types;
$meta_data->{$required} = $result->{$required};
push( @meta_keys, $result->{$required} );
}

my $key = join( '__', @meta_keys );
$meta_info->{$key} = $meta_data;
# Put all of the data points into their respective floored
# buckets
foreach my $value_type ( @value_types ) {
my $key = join( '__', @meta_keys );
$meta_info->{$key} = $meta_data;
# Put all of the data points into their respective floored
# buckets
foreach my $value_type ( @value_types ) {

my $entries = $result->{$value_type};
my $entries = $result->{$value_type};

next if ( !defined( $entries ) );
next if ( !defined( $entries ) );

# Figure this out once, makes it easier later in the code to
# refer to a consistent flag
my $is_aggregate = exists($result->{$value_type . "__max"}) ? 1 : 0;
# Figure this out once, makes it easier later in the code to
# refer to a consistent flag
my $is_aggregate = exists($result->{$value_type . "__max"}) ? 1 : 0;

my $entries_max = $result->{$value_type . "__max"} || [];
my $entries_min = $result->{$value_type . "__min"} || [];
my $entries_hist = $result->{$value_type . "__hist"} || [];
my $entries_max = $result->{$value_type . "__max"} || [];
my $entries_min = $result->{$value_type . "__min"} || [];
my $entries_hist = $result->{$value_type . "__hist"} || [];

for (my $i = 0; $i < @$entries; $i++){
my $entry = $entries->[$i];
for (my $i = 0; $i < @$entries; $i++){
my $entry = $entries->[$i];

my ( $timestamp, $value ) = @$entry;
my ( $timestamp, $value ) = @$entry;

my $bucket = $to * int($timestamp / $to);
my $bucket = $to * int($timestamp / $to);

push( @{$buckets->{$key}{$bucket}{$value_type}}, {is_aggregate => $is_aggregate,
avg => $value,
min => $entries_min->[$i][1],
max => $entries_max->[$i][1],
hist => $entries_hist->[$i][1],
timestamp => $timestamp}
);
}
}
}
push( @{$buckets->{$key}{$bucket}{$value_type}}, {is_aggregate => $is_aggregate,
avg => $value,
min => $entries_min->[$i][1],
max => $entries_max->[$i][1],
hist => $entries_hist->[$i][1],
timestamp => $timestamp}
);
}
}
}

# handle every measurement that was bucketed
my @keys = keys( %$buckets );
# handle every measurement that was bucketed
my @keys = keys( %$buckets );

foreach my $key ( @keys ) {
foreach my $key ( @keys ) {

# grab meta data hash to pass for this measurement
my $meta_data = $meta_info->{$key};
# grab meta data hash to pass for this measurement
my $meta_data = $meta_info->{$key};

# handle every bucketed timestamp for this measurement
my @timestamps = keys( %{$buckets->{$key}} );
# handle every bucketed timestamp for this measurement
my @timestamps = keys( %{$buckets->{$key}} );

foreach my $time ( @timestamps ) {
foreach my $time ( @timestamps ) {

# all the data during this bucket to aggregate for this measurement
my $data = $buckets->{$key}{$time};
# all the data during this bucket to aggregate for this measurement
my $data = $buckets->{$key}{$time};

my $aggregated = $self->_aggregate( data => $data,
required_meta => $required_meta,
hist_mappings => $hist_mappings,
hist_min_max_mappings => $min_max_mappings,
key => $key );
$aggregated->{'type'} = "$type.aggregate";
$aggregated->{'time'} = $time;
$aggregated->{'interval'} = $to;
$aggregated->{'meta'} = $meta_data;
my $aggregated = $self->_aggregate( data => $data,
required_meta => $required_meta,
hist_mappings => $hist_mappings,
hist_min_max_mappings => $min_max_mappings,
key => $key );
$aggregated->{'type'} = "$type.aggregate";
$aggregated->{'time'} = $time;
$aggregated->{'interval'} = $to;
$aggregated->{'meta'} = $meta_data;

push( @$finished_messages, $aggregated );
}
}
push( @$finished_messages, $aggregated );
}
}
}
catch {
# any failed aggregates are not added to 'finished_messages'
# and are instead pushed to a failed queue
$self->logger->error( "Error aggregating message: $_" );

# Convert Message object to hash (for encoding to JSON later)
my %failed_message = (
type => $message->type,
interval_from => $message->interval_from,
interval_to => $message->interval_to,
start => $message->start,
end => $message->end,
meta => $message->meta,
values => $message->values,
required_meta => $message->required_meta
);
push( @{$results->{'failed_messages'}}, %failed_message );
}
}

my $num = @$finished_messages;
Expand All @@ -487,6 +521,8 @@ sub _aggregate_messages {

$self->rabbit->publish( FINISHED_QUEUE_CHANNEL, $queue, $self->json->encode( \@finished_messages ), {'exchange' => ''} );
}

return $results;
}

sub _aggregate {
Expand Down Expand Up @@ -785,6 +821,7 @@ sub _rabbit_connect {
my $rabbit_port = $self->config->get( '/config/rabbit/port' );
my $rabbit_pending_queue = $self->config->get( '/config/rabbit/pending-queue' );
my $rabbit_finished_queue = $self->config->get( '/config/rabbit/finished-queue' );
my $rabbit_failed_queue = $self->config->get( '/config/rabbit/failed-queue' );

while ( 1 ) {

Expand All @@ -808,6 +845,10 @@ sub _rabbit_connect {
$rabbit->channel_open( FINISHED_QUEUE_CHANNEL );
$rabbit->queue_declare( FINISHED_QUEUE_CHANNEL, $rabbit_finished_queue, {'auto_delete' => 0} );

# open channel to the failed aggregate queue we'll send to
$rabbit->channel_open( FAILED_QUEUE_CHANNEL );
$rabbit->queue_declare( FAILED_QUEUE_CHANNEL, $rabbit_failed_queue, {'auto_delete' => 0} );

$self->_set_rabbit( $rabbit );

$connected = 1;
Expand Down