Skip to content

Commit

Permalink
Revert "[Python]Don't stage beam SDK in Stager (#27436)"
Browse files Browse the repository at this point in the history
This reverts commit 408d766.
  • Loading branch information
damccorm authored Aug 22, 2023
1 parent a46a35b commit ace89c9
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 58 deletions.
1 change: 0 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@
## Breaking Changes

* Python SDK: Legacy runner support removed from Dataflow, all pipelines must use runner v2.
* [Python] Dataflow Runner will no longer stage Beam SDK from PyPI in the `--staging_location` at pipeline submission. Custom container images that are not based on Beam's default image must include Apache Beam installation.([#26996](https://github.com/apache/beam/issues/26996))

## Deprecations

Expand Down
10 changes: 4 additions & 6 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1296,13 +1296,11 @@ def _add_argparse_args(cls, parser):
'--sdk_location',
default='default',
help=(
'Path to a custom Beam SDK package to install and use on the'
'runner. It can be a URL, a GCS path, or a local path to an '
'Override the default location from where the Beam SDK is '
'downloaded. It can be a URL, a GCS path, or a local path to an '
'SDK tarball. Workflow submissions will download or copy an SDK '
'tarball from here. If set to "default", '
'runners will use the SDK provided in the default environment.'
'Use this flag when running pipelines with an unreleased or '
'manually patched version of Beam SDK.'))
'tarball from here. If set to the string "default", a standard '
'SDK location is used. If empty, no SDK is copied.'))
parser.add_argument(
'--extra_package',
'--extra_packages',
Expand Down
192 changes: 162 additions & 30 deletions sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ def commit_manifest(self):
"""Commits manifest."""
raise NotImplementedError

@staticmethod
def get_sdk_package_name():
"""For internal use only; no backwards-compatibility guarantees.
Returns the PyPI package name to be staged."""
return names.BEAM_PACKAGE_NAME

@staticmethod
def _create_file_stage_to_artifact(local_path, staged_name):
return beam_runner_api_pb2.ArtifactInformation(
Expand Down Expand Up @@ -290,28 +296,30 @@ def create_job_resources(options, # type: PipelineOptions
setup_options.extra_packages, temp_dir=temp_dir))

if hasattr(setup_options, 'sdk_location'):
sdk_location = setup_options.sdk_location
if Stager._is_remote_path(sdk_location):
try:
resources.extend(
Stager._create_beam_sdk(
sdk_remote_location=setup_options.sdk_location,
temp_dir=temp_dir,
))
except:
raise RuntimeError(
'The --sdk_location option was used with an unsupported '
'type of location: %s' % sdk_location)

elif sdk_location == 'default':
# Use default location for a runner.
pass
elif sdk_location == 'container':
# Used in the past to indicate that SDK should be used from container
# image instead of being staged.
# Equivalent to 'default' now, leaving for backwards compatibility.

if (setup_options.sdk_location == 'default') or Stager._is_remote_path(
setup_options.sdk_location):
# If --sdk_location is not specified then the appropriate package
# will be obtained from PyPI (https://pypi.python.org) based on the
# version of the currently running SDK. If the option is
# present then no version matching is made and the exact URL or path
# is expected.
#
# Unit tests running in the 'python setup.py test' context will
# not have the sdk_location attribute present and therefore we
# will not stage SDK.
sdk_remote_location = 'pypi' if (
setup_options.sdk_location == 'default'
) else setup_options.sdk_location
resources.extend(
Stager._create_beam_sdk(sdk_remote_location, temp_dir))
elif setup_options.sdk_location == 'container':
# Use the SDK that's built into the container, rather than re-staging
# it.
pass
else:
# This branch is also used by internal tests running with the SDK
# built at head.
if os.path.isdir(setup_options.sdk_location):
sdk_path = os.path.join(
setup_options.sdk_location, names.STAGED_SDK_SOURCES_FILENAME)
Expand All @@ -337,6 +345,7 @@ def create_job_resources(options, # type: PipelineOptions
raise RuntimeError(
'The file "%s" cannot be found. Its location was specified '
'by the --sdk_location command-line option.' % sdk_path)

# The following artifacts are not processed by python sdk container boot
# sequence in a setup mode and hence should not be skipped even if a
# prebuilt sdk container image is used.
Expand Down Expand Up @@ -815,7 +824,8 @@ def _create_beam_sdk(sdk_remote_location, temp_dir):
Args:
sdk_remote_location: A URL from which the file can be downloaded or a
remote file location. The SDK file can be a tarball or a wheel.
remote file location. The SDK file can be a tarball or a wheel. Set
to 'pypi' to download and stage a wheel and source SDK from PyPi.
temp_dir: path to temporary location where the file should be
downloaded.
Expand All @@ -826,14 +836,136 @@ def _create_beam_sdk(sdk_remote_location, temp_dir):
Raises:
RuntimeError: if staging was not successful.
"""
if sdk_remote_location == 'pypi':
sdk_local_file = Stager._download_pypi_sdk_package(temp_dir)
sdk_sources_staged_name = Stager.\
_desired_sdk_filename_in_staging_location(sdk_local_file)
_LOGGER.info('Staging SDK sources from PyPI: %s', sdk_sources_staged_name)
staged_sdk_files = [
Stager._create_file_stage_to_artifact(
sdk_local_file, sdk_sources_staged_name)
]
try:
abi_suffix = 'm' if sys.version_info < (3, 8) else ''
# Stage binary distribution of the SDK, for now on a best-effort basis.
platform_tag = Stager._get_platform_for_default_sdk_container()
sdk_local_file = Stager._download_pypi_sdk_package(
temp_dir,
fetch_binary=True,
language_version_tag='%d%d' %
(sys.version_info[0], sys.version_info[1]),
abi_tag='cp%d%d%s' %
(sys.version_info[0], sys.version_info[1], abi_suffix),
platform_tag=platform_tag)
sdk_binary_staged_name = Stager.\
_desired_sdk_filename_in_staging_location(sdk_local_file)
_LOGGER.info(
'Staging binary distribution of the SDK from PyPI: %s',
sdk_binary_staged_name)
staged_sdk_files.append(
Stager._create_file_stage_to_artifact(
sdk_local_file, sdk_binary_staged_name))
except RuntimeError as e:
_LOGGER.warning(
'Failed to download requested binary distribution '
'of the SDK: %s',
repr(e))

sdk_remote_parsed = urlparse(sdk_remote_location)
sdk_remote_filename = os.path.basename(sdk_remote_parsed.path)
local_download_file = os.path.join(temp_dir, sdk_remote_filename)
Stager._download_file(sdk_remote_location, local_download_file)
staged_name = Stager._desired_sdk_filename_in_staging_location(
local_download_file)
_LOGGER.info('Staging Beam SDK from %s', sdk_remote_location)
return [
Stager._create_file_stage_to_artifact(local_download_file, staged_name)
return staged_sdk_files
elif Stager._is_remote_path(sdk_remote_location):
sdk_remote_parsed = urlparse(sdk_remote_location)
sdk_remote_filename = os.path.basename(sdk_remote_parsed.path)
local_download_file = os.path.join(temp_dir, sdk_remote_filename)
Stager._download_file(sdk_remote_location, local_download_file)
staged_name = Stager._desired_sdk_filename_in_staging_location(
local_download_file)
_LOGGER.info('Staging Beam SDK from %s', sdk_remote_location)
return [
Stager._create_file_stage_to_artifact(
local_download_file, staged_name)
]
else:
raise RuntimeError(
'The --sdk_location option was used with an unsupported '
'type of location: %s' % sdk_remote_location)

@staticmethod
def _download_pypi_sdk_package(
temp_dir,
fetch_binary=False,
language_version_tag='39',
language_implementation_tag='cp',
abi_tag='cp39',
platform_tag='manylinux2014_x86_64'):
"""Downloads SDK package from PyPI and returns path to local path."""
package_name = Stager.get_sdk_package_name()
try:
version = pkg_resources.get_distribution(package_name).version
except pkg_resources.DistributionNotFound:
raise RuntimeError(
'Please set --sdk_location command-line option '
'or install a valid {} distribution.'.format(package_name))
cmd_args = [
Stager._get_python_executable(),
'-m',
'pip',
'download',
'--dest',
temp_dir,
'%s==%s' % (package_name, version),
'--no-deps'
]

if fetch_binary:
_LOGGER.info('Downloading binary distribution of the SDK from PyPi')
# Get a wheel distribution for the SDK from PyPI.
cmd_args.extend([
'--only-binary',
':all:',
'--python-version',
language_version_tag,
'--implementation',
language_implementation_tag,
'--abi',
abi_tag,
'--platform',
platform_tag
])
# Example wheel: with manylinux14 tag.
# apache_beam-2.43.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl # pylint: disable=line-too-long
if platform_tag == 'manylinux2014_x86_64':
platform_tag = 'manylinux_2_17_x86_64.' + platform_tag
expected_files = [
os.path.join(
temp_dir,
'%s-%s-%s%s-%s-%s.whl' % (
package_name.replace('-', '_'),
version,
language_implementation_tag,
language_version_tag,
abi_tag,
platform_tag)),
]

else:
_LOGGER.info('Downloading source distribution of the SDK from PyPi')
cmd_args.extend(['--no-binary', ':all:'])
expected_files = [
os.path.join(temp_dir, '%s-%s.zip' % (package_name, version)),
os.path.join(temp_dir, '%s-%s.tar.gz' % (package_name, version))
]

_LOGGER.info('Executing command: %s', cmd_args)
try:
processes.check_output(cmd_args)
except processes.CalledProcessError as e:
raise RuntimeError(repr(e))

for sdk_file in expected_files:
if os.path.exists(sdk_file):
return sdk_file

raise RuntimeError(
'Failed to download a distribution for the running SDK. '
'Expected either one of %s to be found in the download folder.' %
(expected_files))
92 changes: 89 additions & 3 deletions sdks/python/apache_beam/runners/portability/stager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,64 @@ def populate_requirements_cache(
self.create_temp_file(os.path.join(cache_dir, 'abc.txt'), 'nothing')
self.create_temp_file(os.path.join(cache_dir, 'def.txt'), 'nothing')

def build_fake_pip_download_command_handler(self, has_wheels):
"""A stub for apache_beam.utils.processes.check_output that imitates pip.
Args:
has_wheels: Whether pip fake should have a whl distribution of packages.
"""
def pip_fake(args):
"""Fakes fetching a package from pip by creating a temporary file.
Args:
args: a complete list of command line arguments to invoke pip.
The fake is sensitive to the order of the arguments.
Supported commands:
1) Download SDK sources file:
python pip -m download --dest /tmp/dir apache-beam==2.0.0 \
--no-deps --no-binary :all:
2) Download SDK binary wheel file:
python pip -m download --dest /tmp/dir apache-beam==2.0.0 \
--no-deps --no-binary :all: --python-version 27 \
--implementation cp --abi cp27mu --platform manylinux1_x86_64
"""
package_file = None
if len(args) >= 8:
# package_name==x.y.z
if '==' in args[6]:
distribution_name = args[6][0:args[6].find('==')]
distribution_version = args[6][args[6].find('==') + 2:]

if args[8] == '--no-binary':
package_file = '%s-%s.zip' % (
distribution_name, distribution_version)
elif args[8] == '--only-binary' and len(args) >= 18:
if not has_wheels:
# Imitate the case when desired wheel distribution is not in PyPI.
raise RuntimeError('No matching distribution.')

# Per PEP-0427 in wheel filenames non-alphanumeric characters
# in distribution name are replaced with underscore.
distribution_name = distribution_name.replace('-', '_')
if args[17] == 'manylinux2014_x86_64':
args[17] = 'manylinux_2_17_x86_64.' + args[17]
package_file = '%s-%s-%s%s-%s-%s.whl' % (
distribution_name,
distribution_version,
args[13], # implementation
args[11], # python version
args[15], # abi tag
args[17] # platform
)

assert package_file, 'Pip fake does not support the command: ' + str(args)
self.create_temp_file(
FileSystems.join(args[5], package_file), 'Package content.')

return pip_fake

@mock.patch('apache_beam.runners.portability.stager.open')
@mock.patch('apache_beam.runners.portability.stager.get_new_http')
def test_download_file_https(self, mock_new_http, mock_open):
Expand Down Expand Up @@ -375,10 +433,38 @@ def test_sdk_location_default(self):
self.update_options(options)
options.view_as(SetupOptions).sdk_location = 'default'

_, staged_resources = self.stager.create_and_stage_job_resources(
options, temp_dir=self.make_temp_dir(), staging_location=staging_dir)
with mock.patch(
'apache_beam.utils.processes.check_output',
self.build_fake_pip_download_command_handler(has_wheels=False)):
_, staged_resources = self.stager.create_and_stage_job_resources(
options, temp_dir=self.make_temp_dir(), staging_location=staging_dir)

self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME], staged_resources)

with open(os.path.join(staging_dir,
names.STAGED_SDK_SOURCES_FILENAME)) as f:
self.assertEqual(f.read(), 'Package content.')

def test_sdk_location_default_with_wheels(self):
staging_dir = self.make_temp_dir()

options = PipelineOptions()
self.update_options(options)
options.view_as(SetupOptions).sdk_location = 'default'

self.assertEqual([], staged_resources)
with mock.patch(
'apache_beam.utils.processes.check_output',
self.build_fake_pip_download_command_handler(has_wheels=True)):
_, staged_resources = self.stager.create_and_stage_job_resources(
options, temp_dir=self.make_temp_dir(), staging_location=staging_dir)

self.assertEqual(len(staged_resources), 2)
self.assertEqual(staged_resources[0], names.STAGED_SDK_SOURCES_FILENAME)
# Exact name depends on the version of the SDK.
self.assertTrue(staged_resources[1].endswith('whl'))
for name in staged_resources:
with open(os.path.join(staging_dir, name)) as f:
self.assertEqual(f.read(), 'Package content.')

def test_sdk_location_local_directory(self):
staging_dir = self.make_temp_dir()
Expand Down
5 changes: 0 additions & 5 deletions sdks/python/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,11 +377,6 @@ func installSetupPackages(files []string, workDir string, requirementsFiles []st
log.Printf("Failed to setup acceptable wheel specs, leave it as empty: %v", err)
}

pkgName := "apache-beam"
isSdkInstalled := isPackageInstalled(pkgName)
if !isSdkInstalled {
return fmt.Errorf("Apache Beam is not installed in the runtime environment. If you use a custom container image, you must install apache-beam package in the custom image using same version of Beam as in the pipeline submission environment. For more information, see: the https://beam.apache.org/documentation/runtime/environments/.")
}
// Install the Dataflow Python SDK and worker packages.
// We install the extra requirements in case of using the beam sdk. These are ignored by pip
// if the user is using an SDK that does not provide these.
Expand Down
13 changes: 0 additions & 13 deletions sdks/python/container/piputil.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"log"
"os"
"os/exec"
"path/filepath"
"strings"

Expand Down Expand Up @@ -51,18 +50,6 @@ func pipInstallRequirements(files []string, dir, name string) error {
return nil
}

// isPackageInstalled checks if the given package is installed in the
// environment.
func isPackageInstalled(pkgName string) bool {
cmd := exec.Command("python", "-m", "pip", "show", pkgName)
if err := cmd.Run(); err != nil {
if _, ok := err.(*exec.ExitError); ok {
return false
}
}
return true
}

// pipInstallPackage installs the given package, if present.
func pipInstallPackage(files []string, dir, name string, force, optional bool, extras []string) error {
for _, file := range files {
Expand Down

0 comments on commit ace89c9

Please sign in to comment.