From 25cf80e0ea43e1c336a10c212cc4598a4fe1b436 Mon Sep 17 00:00:00 2001 From: Ben Allan Date: Wed, 1 May 2019 14:52:39 -0600 Subject: [PATCH] add array, index, gzip, and alias support. --- ldms/man/ldms-csv-export-sos.man | 12 ++- ldms/scripts/ldms-csv-export-sos | 171 ++++++++++++++++++++++++------- 2 files changed, 144 insertions(+), 39 deletions(-) diff --git a/ldms/man/ldms-csv-export-sos.man b/ldms/man/ldms-csv-export-sos.man index 0b87e42c3..d93f7cff6 100644 --- a/ldms/man/ldms-csv-export-sos.man +++ b/ldms/man/ldms-csv-export-sos.man @@ -28,6 +28,7 @@ sos-import-csv. DATA is a file name of a LDMS .HEADER, .KIND, or data file. The file name and at least the first line of the file are digested to determine the content and the column types. LDMS CSV file name conventions ($schema[.$date] is associated with $schema.HEADER.$date or $schema.KIND.$date in the same directory). +The file may be gzipped; if so, the matching data/HEADER/KIND files must also be gzipped. .TP --blacklist= .br @@ -51,12 +52,16 @@ NAME overrides the default schema name determined from the data file name. .TP --schema-file= .br -Use an existing schema file FILE instead of generating a schema. When not specified, a schema file is always generated. +Use an existing schema file FILE instead of generating a schema. When not specified, a schema file is always generated. Schema files may not be gzipped. .TP --map-file= .br Override the output map file name derived from the data file name. .TP +--alias-file= +.br +Provide the list of metrics to rename when creating or matching a schema discovered from a header line. +.TP --strip-udata .br Suppress output of .userdata fields and remove .value suffix from schema element names. @@ -99,13 +104,12 @@ file is formatted for editability, and it should be adjusted before use with SOS if any guess or assumption proves erroneous. .SH BUGS -The initial release of this tool does not support export of LDMS CSV array metrics (other than char array), because their import specification is not yet defined for sos-import-csv. There is no pipeline filtering mode. .SH EXAMPLES -To use sos-import-csv with the resulting files: +To test sos-import-csv with the resulting files: .nf ldms-csv-export-sos --data=renamecsv.1553744481 \\ @@ -122,6 +126,8 @@ sos-import-csv \\ --map renamecsv.MAPSOS.1553744481 \\ --schema meminfo \\ --status +sos_cmd -C container -l +sos_cmd -C container -q -S meminfo -X Time .fi Other examples diff --git a/ldms/scripts/ldms-csv-export-sos b/ldms/scripts/ldms-csv-export-sos index 31f7c32a9..7054179e8 100755 --- a/ldms/scripts/ldms-csv-export-sos +++ b/ldms/scripts/ldms-csv-export-sos @@ -6,12 +6,12 @@ import os.path import json import os import tempfile +import gzip +import traceback # ASSUMPTIONS: # See name_to_ldms_kind(n) for canonical name to type assumptions. # the files processed are closed/stable, or fs races may occur. # NOTES: -#-Currently sos-import-csv does not support arrays. This -# takes a guess at what sos-import-csv support will look like. #-SOS does not have a single byte or char or uint8 type; widened # to u16. @@ -68,6 +68,12 @@ ldms_type_sos_map = { "char" : "uint16", match_elements = lambda p, s: [i for (q, i) in zip(s, range(len(s))) if p == q] +def fileopener(fn): + """return the correct open function for the filename (default or gzip.open)""" + if fn.endswith(".gz") or fn.endswith(".GZ"): + return gzip.open + return open + def sos_widen(s): if s in ["uint16", "uint32", "uint64"]: return "uint64" @@ -91,12 +97,36 @@ def ldms_type_to_sos_type(s): t = s[0:x+2] return ldms_type_sos_map[t] -def parse_header(fn): +def parse_aliases(fn): + """ load 2 column space separated alias file.""" + aliases = dict() + if not fn: + return aliases with open(fn, "r") as f: + for i in f: + if i.startswith("#"): + continue + y = i.strip().split() + if len(y) != 2: + print "misformatted alias in", fn, ":", y + sys.exit(1) + if y[0] in aliases: + print "redefined alias in", fn, ":", y[0] + sys.exit(1) + aliases[y[0]] = y[1] + return aliases + +def parse_header(fn, subs): + """load header line and swap any aliases immediately""" + with fileopener(fn)(fn, "r") as f: x = f.readline() y = x.lstrip("#") z = y.split(",") - return [item.strip().strip('"') for item in z] + q = [item.strip().strip('"') for item in z] + for i in range(len(q)): + if q[i] in subs: + q[i] = subs[q[i]] + return q def kind_is_scalar(n): x = n.rfind("[]") @@ -106,7 +136,7 @@ def kind_is_scalar(n): def parse_kind(fn, arr, udata, col_heads): """get raw kind list from kind file fn""" - with open(fn, "r") as f: + with fileopener(fn)(fn, "r") as f: x = f.readline().split("!") y = '!'.join(x[2:]) z = [i.rstrip() for i in y.split(",")] @@ -140,7 +170,7 @@ def parse_kind(fn, arr, udata, col_heads): def name_to_ldms_kind(n): """Map canonical headings to types""" - if n == "Time": + if n in ["Time", "#Time"] : return "timestamp" if n in ["ProducerName", "producer"]: return "char[]" @@ -219,7 +249,7 @@ def uncastable(old, new, val): def update_type(oldtype, value): """check oldtype and value and return promotion that works, or exit. A signed type never returns unless a negative is seen.""" - # types we may see:"char", "char[]" "s64", "s32", "s16", "s8" "u32", "u16", "u8" "u64" + # types we may see:"char", "char[]" "s64", "s32", "s16", "s8" "u32", "u16", "u8" "u64", "timestamp" x = value_type(value) if oldtype is None: return x @@ -234,6 +264,8 @@ def update_type(oldtype, value): if x == "d64": if oldtype in ["char", "char[]"]: return "char[]" + if oldtype == "timestamp": + return oldtype return x if x == "u64": if oldtype in ["char"]: @@ -248,7 +280,7 @@ def update_type(oldtype, value): if x == "u32": if oldtype in ["char"]: return "char[]" - if oldtype == ["u64", "d64", "char[]", "s64", "u32", "timestamp"]: + if oldtype in ["u64", "d64", "char[]", "s64", "u32", "timestamp"]: return oldtype if oldtype in ["u16", "u8"]: return x @@ -323,7 +355,7 @@ def guess_kind(col_heads, fn, maxlines): """ loop over maxlines of data file and examine each column to determine type""" line = 0 z = None; # array of type names to return - with open(fn, "r") as f: + with fileopener(fn)(fn, "r") as f: for x in f: if len(x) > 0 and x[0] == '#': # get defaulted types from header names if present @@ -357,7 +389,7 @@ def kind_format(fn): """Check named file for containing kind line. Return None or (arr, udata) booleans. """ - with open(fn, "r") as f: + with fileopener(fn)(fn, "r") as f: x = f.readline() try: fmt = x.split("!")[1] @@ -376,7 +408,7 @@ def kind_format(fn): def internal_header(fn): """Check named file for containing initial header line""" try: - with open(fn, "r") as f: + with fileopener(fn)(fn, "r") as f: x = f.readline() if len(x) > 5 and x[0:5] == "#Time": return True @@ -384,6 +416,13 @@ def internal_header(fn): except Exception: return False +def strip_gz(s): + print "strip_gz", s + sp = os.path.splitext(s) + if sp[1].lower() == ".gz": + return sp[0] + return s + def get_filenames(fn, args): """ Get canonical names of files based on a file name and ut options. @@ -396,36 +435,53 @@ def get_filenames(fn, args): if ".HEADER" in fn: header = fn schemafile = fn.replace(".HEADER",".SCHEMASOS") + schemafile = strip_gz(schemafile) mapfile = fn.replace(".HEADER",".MAPSOS") + mapfile = strip_gz(mapfile) kind = fn.replace(".HEADER",".KIND") data = fn.replace(".HEADER","") args.dataname = os.path.basename(data) elif ".KIND" in fn: kind = fn schemafile = fn.replace(".KIND",".SCHEMASOS") + schemafile = strip_gz(schemafile) mapfile = fn.replace(".KIND",".MAPSOS") + mapfile = strip_gz(mapfile) header = fn.replace(".KIND",".HEADER") data = fn.replace(".KIND","") args.dataname = os.path.basename(data) else: data = fn - x = fn.split(".")[-1] + sp = fn.split(".") + x = sp[-1] + gzipped = False + if x.lower() == "gz": + gzipped = True + x = sp[-2] try: ts = int(x) except ValueError: ts = None if ts is None: args.dataname = os.path.basename(fn) - header = fn + ".HEADER" - kind = fn + ".KIND" - schemafile = fn + ".SCHEMASOS" - mapfile = fn + ".MAPSOS" + if gzipped: + header = ".".join(sp[:-1]) + ".HEADER.gz" + kind = ".".join(sp[:-1]) + ".KIND.gz" + schemafile = ".".join(sp[:-1]) + ".SCHEMASOS" + mapfile = ".".join(sp[:-1]) + ".MAPSOS" + else: + header = fn + ".HEADER" + kind = fn + ".KIND" + schemafile = fn + ".SCHEMASOS" + mapfile = fn + ".MAPSOS" else: args.dataname = os.path.basename(fn).replace("." + x, "") header = fn.replace("." + x, ".HEADER." + x) kind = fn.replace("." + x, ".KIND." + x) schemafile = fn.replace("." + x, ".SCHEMASOS." + x) + schemafile = strip_gz(schemafile) mapfile = fn.replace("." + x, ".MAPSOS." + x) + mapfile = strip_gz(mapfile) if args.map_file: if os.path.isfile(mapfile) and args.map_file != mapfile: if args.verbose: @@ -447,15 +503,43 @@ def format_schema_prolog(f, args): "attrs" : [""" print >> f, s +def format_join(f, j, col_heads): + jname = j[0] + jcols = j[1:] + for i in jcols: + if not i in col_heads: + return + s = ",\n" + s += "\t{ \"name\" : \"" + jname + "\",\t\"type\" : \"join\",\t\"join_attrs\" : [" + for i in jcols: + if i != jcols[0]: + s += ", " + s += "\"" + i + "\"" + s += "], \"index\" : {} }" + print >>f, s, + +default_joins = [ + ("comp_time", "component_id", "Time"), + ("prod_time", "ProducerName", "Time"), + ("job_comp_time", "job_id", "component_id", "Time"), + ("job_prod_time", "job_id", "ProducerName", "Time"), + ("job_time_comp", "job_id", "Time", "component_id"), + ("job_time_prod", "job_id", "Time", "ProducerName") + ] + def format_schema_epilog(f, args, col_heads): + """add joins which are well defined from data in col_heads.""" + #fixme: need to call format_schema_epilog with tuple list from config file + # or default_joins if none given. + for i in default_joins: + format_join(f, i, col_heads) s="\n" - # check heads for indices and list joins s += """ ] }""" print >>f, s def is_index(n, args): - if n in ["Time", "component_id", "job_id", "component_id.value", "job_id.value", "ProducerName"]: + if n in ["Time", "component_id", "job_id", "component_id.value", "job_id.value", "ProducerName", "anonymized_host_ProducerName"]: return True return False @@ -490,13 +574,16 @@ def format_map_epilog(f): print >>f, "\n]" def format_map_element(f, k, metric, first): - # this may need minor revisions if sos-import-csv array markup - # is incompatible with simple list. + # accepts k as in or array of ints if first: s = "" else: s = ",\n" - s += "\t{ \"action\" : \"assign\", \"args\" : [\"" + metric + "\", " + str(k).strip("[]") + "] }" + if isinstance(k, int): + colspec = "\"column\" : " + str(k) + else: + colspec = "\"list\" : [" + str(k).strip("[]") + "]" + s += "\t{ \"target\" : \"" + metric + "\", \"source\" : { " + colspec + " }}" print >> f, s, def generate_map_new(col_heads, col_kinds, arr, udata, mapout, sout, mapvec): @@ -587,21 +674,25 @@ def generate_map_old(col_heads, col_kinds, arr, udata, mapout, sout, mapvec, str format_map_element(f, q, sout[k], first) first = False else: - print "schema element", sout[k], "not found in csv" + print "warning: schema element", sout[k], "not found in csv" print "u=", udata print "a=", arr print col_heads - sys.exit(1) - # should this be fatal or are sos records allowed to have empty fields? + # sos records are allowed to have empty fields at present k += 1 format_map_epilog(f) +metric_types = set() +for i in ldms_type_sos_map: + metric_types.add(ldms_type_sos_map[i]) + def parse_schema(fn): - with open(fn, "r") as f: + with fileopener(fn)(fn, "r") as f: sch = json.load(f) x = sch["name"] a = [] for i in sch["attrs"]: + if i["type"] in metric_types: a.append(i["name"]) return a @@ -614,7 +705,7 @@ def parse_metric_list(s): def parse_metric_file(fn): if not fn: return None - with open(fn, "r") as f: + with fileopener(fn)(fn, "r") as f: l = [] for x in f: y = x.strip() @@ -789,14 +880,20 @@ def generate_schema(col_heads, col_kinds, arr, udata, args, schemaout, mapout): format_schema_epilog(f, args, col_heads) if not old_schema: + if args.verbose: + print "rename to", schemaout os.rename(schematmp, schemaout) schematmp = None new_schema = parse_schema(schemaout) + if args.verbose: + print "map:", mapout generate_map_new(metvec, col_kinds, arr, udata, mapout, new_schema, mapvec) else: + if args.verbose: + print "mapold:", mapout generate_map_old(metvec, col_kinds, arr, udata, mapout, old_schema, mapvec, args.strip_udata) - except Exception: - pass + except Exception as e: + traceback.print_exc() if schematmp: os.unlink(schematmp) @@ -804,7 +901,7 @@ def generate_schema(col_heads, col_kinds, arr, udata, args, schemaout, mapout): if __name__ == "__main__": parser = argparse.ArgumentParser(description="Generate sos input files from csv") parser.add_argument("--data", default=None, - help="Specify the ldms data, .HEADER or .KIND file to parse. Adjacent files following naming conventions are expected.") + help="Specify the ldms data, .HEADER or .KIND file to parse. Adjacent files following naming conventions are expected. Files may be gzip compressed if ending in .gz") parser.add_argument("--blacklist", default=None, help="Specify file of column names to exclude from import, one per line. leading # comments allowed.") parser.add_argument("--whitelist", default=None, @@ -819,6 +916,8 @@ if __name__ == "__main__": help="Use existing schema file named instead of generating one.") parser.add_argument("--map-file", default=None, help="Override the output map file name derived from the data file name.") + parser.add_argument("--alias-file", default=None, + help="specify column names to replace with alternate names.") parser.add_argument("--strip-udata", action="store_true", help="Suppress output of userdata and .value suffix.") parser.add_argument("--guess", @@ -883,17 +982,17 @@ if __name__ == "__main__": assume_type = validate_assume(args.assume) if args.verbose: print "Assuming data columns are type", assume_type + if args.alias_file: + if not os.path.isfile(args.alias_file): + print "Error: --alias-file given is not found:", args.alias_file + sys.exit(1) + + subs = parse_aliases(args.alias_file) # raw header names list - col_heads = parse_header(header) + col_heads = parse_header(header, subs) # raw kinds list, which may be shorter than heads if unroll vs arrayed col_kinds = None - # temporarily, reject array-containing input, since there is not yet - # support for it in sos-import-csv - if arr: - print "Export failed:" - print "Import of array metrics is not yet supported by sos-import-csv." - sys.exit(1) if not (args.assume or args.guess): col_kinds = parse_kind(kind, arr, udata, col_heads) else: