This repository was archived by the owner on Nov 15, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain_build_wisdom2def.py
71 lines (62 loc) · 2.01 KB
/
main_build_wisdom2def.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import io
import wandb
import pandas as pd
from metaflow import FlowSpec, step, Parameter
from wandb.integration.metaflow import wandb_log
from storyteller.constants import WISDOM2DEF_RAW_A, WISDOM2DEF_RAW_B, WANDB_PROJECT
from storyteller.preprocess import cleanse, normalise, augment, upsample
from storyteller.utils import get_url
class BuildWisdom2DefFlow(FlowSpec):
ver: str = Parameter('ver',
type=str,
help='The version of this artifact. Should be a single alphabet',
default="a")
raw_df: pd.DataFrame
all_df: pd.DataFrame
@step
def start(self):
"""
set ver to be available
"""
self.next(self.download)
@step
def download(self):
"""
ver -> raw_df
"""
if self.ver == "a":
text = get_url(WISDOM2DEF_RAW_A)
elif self.ver == "b":
text = get_url(WISDOM2DEF_RAW_B)
else:
raise ValueError
self.raw_df = pd.read_csv(io.StringIO(text), delimiter="\t")
self.next(self.preprocess)
@step
def preprocess(self):
"""
raw_df -> all_df
"""
self.all_df = self.raw_df \
.pipe(cleanse) \
.pipe(normalise) \
.pipe(augment) \
.pipe(upsample)
self.next(self.end)
@step
@wandb_log(settings=wandb.Settings(project=WANDB_PROJECT))
def end(self):
"""
raw_df, all_df
-> raw_table, all_table
-> artifact: upload this
"""
artifact = wandb.Artifact("wisdom2def", type="dataset")
raw_table = wandb.Table(dataframe=self.raw_df)
all_table = wandb.Table(dataframe=self.all_df)
# add the tables to the artifact
artifact.add(raw_table, "raw")
artifact.add(all_table, "all")
wandb.log_artifact(artifact, aliases=[self.ver, "latest"])
if __name__ == '__main__':
BuildWisdom2DefFlow()