diff --git a/function/loader/load.go b/function/loader/load.go index 22c86449..dac2675f 100644 --- a/function/loader/load.go +++ b/function/loader/load.go @@ -55,3 +55,56 @@ func Load(ctx context.Context, m PubSubMessage) error { log.Printf("Job created: %s", job.ID()) return nil } + +func LoadStaticAnalysis(ctx context.Context, m PubSubMessage) error { + project := os.Getenv("GCP_PROJECT") + bucket := os.Getenv("OSSF_MALWARE_STATIC_ANALYSIS_RESULTS") + + bq, err := bigquery.NewClient(ctx, project) + if err != nil { + return fmt.Errorf("failed to create BigQuery client: %w", err) + + } + defer bq.Close() + + schema, err := bigquery.SchemaFromJSON(staticAnalysisSchemaJSON) + if err != nil { + return fmt.Errorf("failed to decode schema: %w", err) + } + + gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/*.json", bucket)) + gcsRef.Schema = schema + gcsRef.SourceFormat = bigquery.JSON + gcsRef.MaxBadRecords = 10000 + + dataset := bq.Dataset("packages") + loader := dataset.Table("staticanalysis").LoaderFrom(gcsRef) + loader.WriteDisposition = bigquery.WriteTruncate + loader.TimePartitioning = &bigquery.TimePartitioning{ + Type: bigquery.DayPartitioningType, + Field: "created", + } + + job, err := loader.Run(ctx) + if err != nil { + return fmt.Errorf("failed to create load job: %v", err) + } + + fmt.Printf("load job created: %s\n", job.ID()) + + status, err := job.Wait(ctx) + if err != nil { + return fmt.Errorf("error waiting for job: %w", err) + } + + if status.Err() != nil { + fmt.Printf("job completed with %d errors\n", len(status.Errors)) + for idx, err := range status.Errors { + fmt.Printf("error %d: %v\n", idx, err) + } + + return status.Err() + } + + return nil +}