|
8 | 8 | "github.com/ipld/go-ipld-prime"
|
9 | 9 | cidlink "github.com/ipld/go-ipld-prime/linking/cid"
|
10 | 10 | "github.com/libp2p/go-libp2p-core/peer"
|
| 11 | + "go.opentelemetry.io/otel" |
| 12 | + "go.opentelemetry.io/otel/attribute" |
| 13 | + "go.opentelemetry.io/otel/trace" |
11 | 14 | "golang.org/x/xerrors"
|
12 | 15 |
|
13 | 16 | datatransfer "github.com/filecoin-project/go-data-transfer"
|
@@ -39,6 +42,15 @@ func (m *manager) OnChannelOpened(chid datatransfer.ChannelID) error {
|
39 | 42 | // calls revalidators so they can pause / resume the channel or send a
|
40 | 43 | // message over the transport.
|
41 | 44 | func (m *manager) OnDataReceived(chid datatransfer.ChannelID, link ipld.Link, size uint64, index int64) error {
|
| 45 | + ctx, _ := m.spansIndex.SpanForChannel(context.TODO(), chid) |
| 46 | + ctx, span := otel.Tracer("data-transfer").Start(ctx, "dataReceived", trace.WithAttributes( |
| 47 | + attribute.String("channelID", chid.String()), |
| 48 | + attribute.String("link", link.String()), |
| 49 | + attribute.Int64("index", index), |
| 50 | + attribute.Int64("size", int64(size)), |
| 51 | + )) |
| 52 | + defer span.End() |
| 53 | + |
42 | 54 | isNew, err := m.channels.DataReceived(chid, link.(cidlink.Link).Cid, size, index)
|
43 | 55 | if err != nil {
|
44 | 56 | return err
|
@@ -89,6 +101,15 @@ func (m *manager) OnDataQueued(chid datatransfer.ChannelID, link ipld.Link, size
|
89 | 101 | // The transport layer reports that some data has been queued up to be sent
|
90 | 102 | // to the requester, so fire a DataQueued event on the channels state
|
91 | 103 | // machine.
|
| 104 | + |
| 105 | + ctx, _ := m.spansIndex.SpanForChannel(context.TODO(), chid) |
| 106 | + ctx, span := otel.Tracer("data-transfer").Start(ctx, "dataQueued", trace.WithAttributes( |
| 107 | + attribute.String("channelID", chid.String()), |
| 108 | + attribute.String("link", link.String()), |
| 109 | + attribute.Int64("size", int64(size)), |
| 110 | + )) |
| 111 | + defer span.End() |
| 112 | + |
92 | 113 | isNew, err := m.channels.DataQueued(chid, link.(cidlink.Link).Cid, size)
|
93 | 114 | if err != nil {
|
94 | 115 | return nil, err
|
@@ -127,6 +148,15 @@ func (m *manager) OnDataQueued(chid datatransfer.ChannelID, link ipld.Link, size
|
127 | 148 | }
|
128 | 149 |
|
129 | 150 | func (m *manager) OnDataSent(chid datatransfer.ChannelID, link ipld.Link, size uint64) error {
|
| 151 | + |
| 152 | + ctx, _ := m.spansIndex.SpanForChannel(context.TODO(), chid) |
| 153 | + ctx, span := otel.Tracer("data-transfer").Start(ctx, "dataSent", trace.WithAttributes( |
| 154 | + attribute.String("channelID", chid.String()), |
| 155 | + attribute.String("link", link.String()), |
| 156 | + attribute.Int64("size", int64(size)), |
| 157 | + )) |
| 158 | + defer span.End() |
| 159 | + |
130 | 160 | _, err := m.channels.DataSent(chid, link.(cidlink.Link).Cid, size)
|
131 | 161 | return err
|
132 | 162 | }
|
|
0 commit comments