@@ -4,6 +4,7 @@ extern crate serde_json;
4
4
extern crate kirby;
5
5
6
6
use aws_lambda_events:: event:: s3:: S3Event ;
7
+ use aws_lambda_events:: sns:: SnsEventObj ;
7
8
use flate2:: read:: GzDecoder ;
8
9
use lambda_runtime:: tracing:: { self , info, warn} ;
9
10
use lambda_runtime:: { Error , LambdaEvent , service_fn} ;
@@ -13,7 +14,7 @@ use rusoto_s3::*;
13
14
use std:: io:: BufRead ;
14
15
use std:: io:: BufReader ;
15
16
use std:: io:: Cursor ;
16
- use std :: io:: Read ;
17
+ use tokio :: io:: AsyncReadExt ;
17
18
18
19
use kirby:: Options ;
19
20
use kirby:: stream_stats;
@@ -35,8 +36,9 @@ async fn read_object(bucket_name: &str, key: &str) -> Box<dyn BufRead> {
35
36
result
36
37
. body
37
38
. unwrap ( )
38
- . into_blocking_read ( )
39
+ . into_async_read ( )
39
40
. read_to_end ( & mut bytes)
41
+ . await
40
42
. expect ( "Couldn't read object body stream" ) ;
41
43
42
44
if key. ends_with ( "gz" ) {
@@ -58,45 +60,47 @@ async fn write_object(bucket_name: &str, key: &str, body: &str) -> rusoto_s3::Pu
58
60
client. put_object ( req) . await . expect ( "Couldn't PUT object" )
59
61
}
60
62
61
- async fn func ( event : LambdaEvent < S3Event > ) -> Result < ( ) , Error > {
63
+ async fn func ( event : LambdaEvent < SnsEventObj < S3Event > > ) -> Result < ( ) , Error > {
62
64
let opts = Options {
63
65
paths : vec ! [ ] ,
64
66
verbose : false ,
65
67
unknown : false ,
66
68
} ;
67
69
68
70
for record in event. payload . records {
69
- let ( bucket_name, url_key) = match ( & record. s3 . bucket . name , & record. s3 . object . key ) {
70
- ( Some ( bucket_name) , Some ( url_key) ) => ( bucket_name, url_key) ,
71
- _ => {
72
- warn ! ( "missing bucket name or key for record {:?}" , record) ;
73
- continue ;
74
- }
75
- } ;
76
-
77
- let key = percent_decode ( url_key. as_bytes ( ) ) . decode_utf8 ( ) ?;
78
- info ! (
79
- "{} downloading {}/{}" ,
80
- time:: now_utc( ) . rfc3339( ) ,
81
- bucket_name,
82
- & key
83
- ) ;
84
- let reader = read_object ( bucket_name, & key) . await ;
85
-
86
- info ! ( "{} calculating stats..." , time:: now_utc( ) . rfc3339( ) ) ;
87
- let content = stream_stats ( reader, & opts) ;
88
-
89
- let result_key = [ & key, ".json" ]
90
- . concat ( )
91
- . replace ( "fastly_json" , "fastly_stats" ) ;
92
- info ! (
93
- "{} uploading results to {}" ,
94
- time:: now_utc( ) . rfc3339( ) ,
95
- & result_key
96
- ) ;
97
- write_object ( bucket_name, & result_key, & json ! ( content) . to_string ( ) ) . await ;
98
-
99
- info ! ( "{} done with {}" , time:: now_utc( ) . rfc3339( ) , & key) ;
71
+ for record in record. sns . message . records {
72
+ let ( bucket_name, url_key) = match ( & record. s3 . bucket . name , & record. s3 . object . key ) {
73
+ ( Some ( bucket_name) , Some ( url_key) ) => ( bucket_name, url_key) ,
74
+ _ => {
75
+ warn ! ( "missing bucket name or key for record {:?}" , record) ;
76
+ continue ;
77
+ }
78
+ } ;
79
+
80
+ let key = percent_decode ( url_key. as_bytes ( ) ) . decode_utf8 ( ) ?;
81
+ info ! (
82
+ "{} downloading {}/{}" ,
83
+ time:: now_utc( ) . rfc3339( ) ,
84
+ bucket_name,
85
+ & key
86
+ ) ;
87
+ let reader = read_object ( bucket_name, & key) . await ;
88
+
89
+ info ! ( "{} calculating stats..." , time:: now_utc( ) . rfc3339( ) ) ;
90
+ let content = stream_stats ( reader, & opts) ;
91
+
92
+ let result_key = [ & key, ".json" ]
93
+ . concat ( )
94
+ . replace ( "fastly_json" , "fastly_stats" ) ;
95
+ info ! (
96
+ "{} uploading results to {}" ,
97
+ time:: now_utc( ) . rfc3339( ) ,
98
+ & result_key
99
+ ) ;
100
+ write_object ( bucket_name, & result_key, & json ! ( content) . to_string ( ) ) . await ;
101
+
102
+ info ! ( "{} done with {}" , time:: now_utc( ) . rfc3339( ) , & key) ;
103
+ }
100
104
}
101
105
102
106
Ok ( ( ) )
0 commit comments