Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HITL - Data collection #1967

Merged
merged 7 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions examples/hitl/rearrange_v2/app_state_end_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#!/usr/bin/env python3

# Copyright (c) Meta Platforms, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

import os
import shutil
from typing import Optional

from app_data import AppData
from app_state_base import AppStateBase
from app_states import create_app_state_reset
from s3_upload import (
generate_unique_session_id,
make_s3_filename,
upload_file_to_s3,
)
from session import Session
from util import get_top_down_view

from habitat_hitl.app_states.app_service import AppService
from habitat_hitl.core.serialize_utils import save_as_json_gzip
from habitat_hitl.core.user_mask import Mask

# Duration of the end session message, before users are kicked.
SESSION_END_DELAY = 5.0


class AppStateEndSession(AppStateBase):
"""
* Indicate users that the session is terminated.
* Upload collected data.
"""

def __init__(
self, app_service: AppService, app_data: AppData, session: Session
):
super().__init__(app_service, app_data)
self._session = session
self._elapsed_time = 0.0
self._save_keyframes = False

self._status = "Session ended."
if len(session.error) > 0:
self._status += f"\nError: {session.error}"

def get_next_state(self) -> Optional[AppStateBase]:
if self._elapsed_time > SESSION_END_DELAY:
self._end_session()
return create_app_state_reset(self._app_service, self._app_data)
return None

def sim_update(self, dt: float, post_sim_update_dict):
# Top-down view.
cam_matrix = get_top_down_view(self._app_service.sim)
post_sim_update_dict["cam_transform"] = cam_matrix
self._app_service._client_message_manager.update_camera_transform(
cam_matrix, destination_mask=Mask.ALL
)

self._status_message(self._status)
self._elapsed_time += dt

def _end_session(self):
session = self._session
if session is None:
print("Null session. Skipping S3 upload.")
return

# Finalize session.
if self._session.error == "":
session.success = True
session.session_recorder.end_session(self._session.error)

# Get data collection parameters.
try:
config = self._app_service.config
data_collection_config = config.rearrange_v2.data_collection
s3_path = data_collection_config.s3_path
if s3_path[-1] != "/":
s3_path += "/"
s3_subdir = "complete" if session.success else "incomplete"
s3_path += s3_subdir

# Use the port as a discriminator for when there are multiple concurrent servers.
output_folder_suffix = str(config.habitat_hitl.networking.port)
output_folder = f"output_{output_folder_suffix}"

output_file_name = data_collection_config.output_file_name
output_file = f"{output_file_name}.json.gz"

except Exception as e:
print(f"Invalid data collection config. Skipping S3 upload. {e}")
return

# Delete previous output directory
if os.path.exists(output_folder):
shutil.rmtree(output_folder)
Comment on lines +96 to +97
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just making sure this will not delete useful data?

Copy link
Contributor Author

@0mdc 0mdc May 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only contains the .json.gz file. The directory is expected to contain more data in the future (e.g. replay file, screenshots, etc).

You could however change the path in the config to any directory 🤔


# Create new output directory
os.makedirs(output_folder)
json_path = os.path.join(output_folder, output_file)
save_as_json_gzip(session.session_recorder, json_path)

# Generate unique session ID
session_id = generate_unique_session_id(
session.episode_ids, session.connection_records
)

# Upload output directory
orig_file_names = [
f
for f in os.listdir(output_folder)
if os.path.isfile(os.path.join(output_folder, f))
]
for orig_file_name in orig_file_names:
local_file_path = os.path.join(output_folder, orig_file_name)
s3_file_name = make_s3_filename(session_id, orig_file_name)
upload_file_to_s3(local_file_path, s3_file_name, s3_path)
125 changes: 125 additions & 0 deletions examples/hitl/rearrange_v2/app_state_load_episode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#!/usr/bin/env python3

# Copyright (c) Meta Platforms, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from typing import Optional

from app_data import AppData
from app_state_base import AppStateBase
from app_states import (
create_app_state_cancel_session,
create_app_state_end_session,
create_app_state_start_screen,
)
from session import Session
from util import get_top_down_view

from habitat_hitl.app_states.app_service import AppService
from habitat_hitl.core.user_mask import Mask


class AppStateLoadEpisode(AppStateBase):
"""
Load an episode.
A loading screen is shown while the content loads.
* If a next episode exists, launch RearrangeV2.
* If all episodes are done, end session.
* If any user disconnects, cancel the session.
"""

def __init__(
self, app_service: AppService, app_data: AppData, session: Session
):
super().__init__(app_service, app_data)
self._session = session
self._loading = True
self._session_ended = False
self._frame_number = 0
self._save_keyframes = False

def get_next_state(self) -> Optional[AppStateBase]:
if self._cancel:
return create_app_state_cancel_session(
self._app_service,
self._app_data,
self._session,
"User disconnected.",
)
if self._session_ended:
return create_app_state_end_session(
self._app_service, self._app_data, self._session
)
# When all clients finish loading, show the start screen.
if not self._loading:
return create_app_state_start_screen(
self._app_service, self._app_data, self._session
)
return None

def sim_update(self, dt: float, post_sim_update_dict):
self._status_message("Loading...")

# Skip a frame so that the status message reaches the client before the server loads the scene and blocks.
if self._frame_number == 1:
self._increment_episode()
# Once the scene loaded, show a top-down view.
elif self._frame_number > 1:
cam_matrix = get_top_down_view(self._app_service.sim)
post_sim_update_dict["cam_transform"] = cam_matrix
self._app_service._client_message_manager.update_camera_transform(
cam_matrix, destination_mask=Mask.ALL
)
# Wait for clients to signal that content finished loading on their end.
# HACK: The server isn't immediately aware that clients are loading. For now, we simply skip some frames.
# TODO: Use the keyframe ID from 'ClientMessageManager.set_server_keyframe_id()' to find the when the loading state is up-to-date.
if self._frame_number > 20:
any_client_loading = False
for user_index in range(self._app_data.max_user_count):
if self._app_service.remote_client_state._client_loading[
user_index
]:
any_client_loading = True
break
if not any_client_loading:
self._loading = False

self._frame_number += 1

def _increment_episode(self):
session = self._session
assert session.episode_ids is not None
if session.current_episode_index < len(session.episode_ids):
self._set_episode(session.current_episode_index)
session.current_episode_index += 1
else:
self._session_ended = True

def _set_episode(self, episode_index: int):
session = self._session

# Set the ID of the next episode to play in lab.
next_episode_id = session.episode_ids[episode_index]
print(f"Next episode index: {next_episode_id}.")
try:
next_episode_index = int(next_episode_id)
self._app_service.episode_helper.set_next_episode_by_index(
next_episode_index
)
except Exception as e:
print(f"ERROR: Invalid episode index {next_episode_id}. {e}")
print("Loading episode index 0.")
self._app_service.episode_helper.set_next_episode_by_index(0)

# Once an episode ID has been set, lab needs to be reset to load the episode.
self._app_service.end_episode(do_reset=True)

# Signal the clients that the scene has changed.
client_message_manager = self._app_service.client_message_manager
if client_message_manager:
client_message_manager.signal_scene_change(Mask.ALL)

# Save a keyframe. This propagates the new content to the clients, initiating client-side loading.
# Beware that the client "loading" state won't immediately be visible to the server.
self._app_service.sim.gfx_replay_manager.save_keyframe()
4 changes: 2 additions & 2 deletions examples/hitl/rearrange_v2/app_state_lobby.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from app_data import AppData
from app_state_base import AppStateBase
from app_states import create_app_state_rearrange
from app_states import create_app_state_start_session

from habitat_hitl.app_states.app_service import AppService

Expand Down Expand Up @@ -51,7 +51,7 @@ def get_next_state(self) -> Optional[AppStateBase]:
== self._app_data.max_user_count
and self._time_since_last_connection > START_SESSION_DELAY
):
return create_app_state_rearrange(
return create_app_state_start_session(
self._app_service, self._app_data
)
return None
Expand Down
125 changes: 125 additions & 0 deletions examples/hitl/rearrange_v2/app_state_start_screen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#!/usr/bin/env python3

# Copyright (c) Meta Platforms, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from typing import List, Optional

from app_data import AppData
from app_state_base import AppStateBase
from app_states import (
create_app_state_cancel_session,
create_app_state_rearrange,
)
from session import Session
from util import get_top_down_view

from habitat_hitl.app_states.app_service import AppService
from habitat_hitl.core.client_message_manager import UIButton
from habitat_hitl.core.key_mapping import KeyCode
from habitat_hitl.core.text_drawer import TextOnScreenAlignment
from habitat_hitl.core.user_mask import Mask

START_BUTTON_ID = "start"
START_SCREEN_TIMEOUT = 180.0
SKIP_START_SCREEN = False


class AppStateStartScreen(AppStateBase):
"""
Start screen with a "Start" button that all users must press before starting the session.
Cancellable.
"""

def __init__(
self, app_service: AppService, app_data: AppData, session: Session
):
super().__init__(app_service, app_data)
self._session = session
self._ready_to_start: List[bool] = [
False
] * self._app_data.max_user_count
self._elapsed_time: float = 0.0
self._timeout = False # TODO: Error management
self._save_keyframes = True

def get_next_state(self) -> Optional[AppStateBase]:
if self._cancel:
error = "Timeout" if self._timeout else "User disconnected"
return create_app_state_cancel_session(
self._app_service, self._app_data, self._session, error
)

# If all users pressed the "Start" button, begin the session.
ready_to_start = True
for user_ready in self._ready_to_start:
ready_to_start &= user_ready
if ready_to_start or SKIP_START_SCREEN:
return create_app_state_rearrange(
self._app_service, self._app_data, self._session
)

return None

def sim_update(self, dt: float, post_sim_update_dict):
# Top-down view.
cam_matrix = get_top_down_view(self._app_service.sim)
post_sim_update_dict["cam_transform"] = cam_matrix
self._app_service._client_message_manager.update_camera_transform(
cam_matrix, destination_mask=Mask.ALL
)

# Time limit to start the experiment.
self._elapsed_time += dt
remaining_time = START_SCREEN_TIMEOUT - self._elapsed_time
if remaining_time <= 0:
self._cancel = True
self._timeout = True
return
remaining_time_int = int(remaining_time)
title = f"New Session (Expires in: {remaining_time_int}s)"

# Show dialogue box with "Start" button.
for user_index in range(self._app_data.max_user_count):
button_pressed = (
self._app_service.remote_client_state.ui_button_pressed(
user_index, START_BUTTON_ID
)
)
self._ready_to_start[user_index] |= button_pressed

if not self._ready_to_start[user_index]:
self._app_service.client_message_manager.show_modal_dialogue_box(
title,
"Press 'Start' to begin the experiment.",
[UIButton(START_BUTTON_ID, "Start", True)],
destination_mask=Mask.from_index(user_index),
)
else:
self._app_service.client_message_manager.show_modal_dialogue_box(
title,
"Waiting for other participants...",
[UIButton(START_BUTTON_ID, "Start", False)],
destination_mask=Mask.from_index(user_index),
)

# Server-only: Press numeric keys to start episode on behalf of users.
if not self._app_service.hitl_config.experimental.headless.do_headless:
server_message = "Press numeric keys to start on behalf of users."
first_key = int(KeyCode.ONE)
for user_index in range(len(self._ready_to_start)):
if self._app_service.gui_input.get_key_down(
KeyCode(first_key + user_index)
):
self._ready_to_start[user_index] = True
user_ready = self._ready_to_start[user_index]
server_message += f"\n[{user_index + 1}]: User {user_index}: {'Ready' if user_ready else 'Not ready'}."

self._app_service.text_drawer.add_text(
server_message,
TextOnScreenAlignment.TOP_LEFT,
text_delta_x=0,
text_delta_y=-50,
destination_mask=Mask.NONE,
)
Loading