Skip to content

Commit

Permalink
test: langfuse - make polling more robust in tests (#1375)
Browse files Browse the repository at this point in the history
* langfuse - make polling more robust in tests

* improvements

* improvements

* increase sleep

* Trigger Build
  • Loading branch information
anakin87 authored Feb 14, 2025
1 parent bb3f16c commit 2d77b83
Showing 1 changed file with 36 additions and 40 deletions.
76 changes: 36 additions & 40 deletions integrations/langfuse/tests/test_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,29 @@
os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true"


def poll_langfuse(url: str):
"""Utility function to poll Langfuse API until the trace is ready"""
# Initial wait for trace creation
time.sleep(10)

auth = HTTPBasicAuth(os.environ["LANGFUSE_PUBLIC_KEY"], os.environ["LANGFUSE_SECRET_KEY"])

attempts = 5
delay = 1

while attempts > 0:
res = requests.get(url, auth=auth)
if res.status_code == 200:
return res

attempts -= 1
if attempts > 0:
time.sleep(delay)
delay *= 2

return res


@pytest.fixture
def pipeline_with_env_vars(llm_class, expected_trace):
"""Pipeline factory using environment variables for Langfuse authentication"""
Expand Down Expand Up @@ -109,27 +132,15 @@ def test_tracing_integration(llm_class, env_var, expected_trace, pipeline_fixtur
uuid = os.path.basename(urlparse(trace_url).path)
url = f"https://cloud.langfuse.com/api/public/traces/{uuid}"

# Poll the Langfuse API a bit as the trace might not be ready right away
attempts = 5
delay = 1
while attempts >= 0:
res = requests.get(
url, auth=HTTPBasicAuth(os.environ["LANGFUSE_PUBLIC_KEY"], os.environ["LANGFUSE_SECRET_KEY"])
)
if attempts > 0 and res.status_code != 200:
attempts -= 1
time.sleep(delay)
delay *= 2
continue
assert res.status_code == 200, f"Failed to retrieve data from Langfuse API: {res.status_code}"
res = poll_langfuse(url)
assert res.status_code == 200, f"Failed to retrieve data from Langfuse API: {res.status_code}"

# check if the trace contains the expected LLM name
assert expected_trace in str(res.content)
# check if the trace contains the expected generation span
assert "GENERATION" in str(res.content)
# check if the trace contains the expected user_id
assert "user_42" in str(res.content)
break
# check if the trace contains the expected LLM name
assert expected_trace in str(res.content)
# check if the trace contains the expected generation span
assert "GENERATION" in str(res.content)
# check if the trace contains the expected user_id
assert "user_42" in str(res.content)


def test_pipeline_serialization(monkeypatch):
Expand Down Expand Up @@ -237,24 +248,9 @@ def test_custom_span_handler():
uuid = os.path.basename(urlparse(trace_url).path)
url = f"https://cloud.langfuse.com/api/public/traces/{uuid}"

# we first need to wait for the response to be available and the trace to be created
time.sleep(5)

# Poll the Langfuse API
attempts = 5
delay = 1
while attempts >= 0:
res = requests.get(
url, auth=HTTPBasicAuth(os.environ["LANGFUSE_PUBLIC_KEY"], os.environ["LANGFUSE_SECRET_KEY"])
)
if attempts > 0 and res.status_code != 200:
attempts -= 1
time.sleep(delay)
delay *= 2
continue
res = poll_langfuse(url)
assert res.status_code == 200, f"Failed to retrieve data from Langfuse API: {res.status_code}"

assert res.status_code == 200
content = str(res.content)
assert "WARNING" in content
assert "Response too long" in content
break
content = str(res.content)
assert "WARNING" in content
assert "Response too long" in content

0 comments on commit 2d77b83

Please sign in to comment.