blob: 873817e0e8e01e2e2962cf2e66ffeaa8ca959554 [file] [log] [blame]
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Mounts all the projects required by a selected Build target.
For details on how filesystem overlays work see the filesystem overlays
section of the README.md.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections
import os
import subprocess
import tempfile
import xml.etree.ElementTree as ET
from . import config
BindMount = collections.namedtuple(
'BindMount', ['source_dir', 'readonly', 'allows_replacement'])
class BindOverlay(object):
"""Manages filesystem overlays of Android source tree using bind mounts.
"""
MAX_BIND_MOUNTS = 10000
def _HideDir(self, target_dir):
"""Temporarily replace the target directory for an empty directory.
Args:
target_dir: A string path to the target directory.
Returns:
A string path to the empty directory that replaced the target directory.
"""
empty_dir = tempfile.mkdtemp(prefix='empty_dir_')
self._AddBindMount(empty_dir, target_dir)
return empty_dir
def _FindBindMountConflict(self, path):
"""Finds any path in the bind mounts that conflicts with the provided path.
Args:
path: A string path to be checked.
Returns:
A tuple containing a string of the conflicting path in the bind mounts and
whether or not to allow this path to supersede any conflicts.
None, False if there was no conflict found.
"""
conflict_path = None
allows_replacement = False
for bind_destination, bind_mount in self._bind_mounts.items():
allows_replacement = bind_mount.allows_replacement
# Check if the path is a subdir or the bind destination
if path == bind_destination:
conflict_path = bind_mount.source_dir
break
elif path.startswith(bind_destination + os.sep):
relative_path = os.path.relpath(path, bind_destination)
path_in_source = os.path.join(bind_mount.source_dir, relative_path)
if os.path.exists(path_in_source) and os.listdir(path_in_source):
# A conflicting path exists within this bind mount
# and it's not empty
conflict_path = path_in_source
break
return conflict_path, allows_replacement
def _AddOverlay(self, source_dir, overlay_dir, intermediate_work_dir,
skip_subdirs, allowed_projects, destination_dir,
allowed_read_write, contains_read_write,
is_replacement_allowed):
"""Adds a single overlay directory.
Args:
source_dir: A string with the path to the Android platform source.
overlay_dir: A string path to the overlay directory to apply.
intermediate_work_dir: A string path to the intermediate work directory used as the
base for constructing the overlay filesystem.
skip_subdirs: A set of string paths to skip from overlaying.
allowed_projects: If not None, any .git project path not in this list
is excluded from overlaying.
destination_dir: A string with the path to the source with the overlays
applied to it.
allowed_read_write: A function returns true if the path input should
be allowed read/write access.
contains_read_write: A function returns true if the path input contains
a sub-path that should be allowed read/write access.
is_replacement_allowed: A function returns true if the path can replace a
subsequent path.
"""
# Traverse the overlay directory twice
# The first pass only process git projects
# The second time process all other files that are not in git projects
# We need to process all git projects first because
# the way we process a non-git directory will depend on if
# it contains a git project in a subdirectory or not.
dirs_with_git_projects = set('/')
for current_dir_origin, subdirs, files in os.walk(overlay_dir):
if current_dir_origin in skip_subdirs:
del subdirs[:]
continue
current_dir_relative = os.path.relpath(current_dir_origin, overlay_dir)
current_dir_destination = os.path.normpath(
os.path.join(destination_dir, current_dir_relative))
if '.git' in subdirs or '.git' in files or '.bindmount' in files:
# The current dir is a git project
# so just bind mount it
del subdirs[:]
if '.bindmount' in files or (not allowed_projects or
os.path.relpath(current_dir_origin, source_dir) in allowed_projects):
self._AddBindMount(
current_dir_origin, current_dir_destination,
False if allowed_read_write(current_dir_origin) else True,
is_replacement_allowed(
os.path.basename(overlay_dir), current_dir_relative))
current_dir_ancestor = current_dir_origin
while current_dir_ancestor and current_dir_ancestor not in dirs_with_git_projects:
dirs_with_git_projects.add(current_dir_ancestor)
current_dir_ancestor = os.path.dirname(current_dir_ancestor)
# Process all other files that are not in git projects
for current_dir_origin, subdirs, files in os.walk(overlay_dir):
if current_dir_origin in skip_subdirs:
del subdirs[:]
continue
if '.git' in subdirs or '.git' in files or '.bindmount' in files:
del subdirs[:]
continue
current_dir_relative = os.path.relpath(current_dir_origin, overlay_dir)
current_dir_destination = os.path.normpath(
os.path.join(destination_dir, current_dir_relative))
bindCurrentDir = True
# Directories with git projects can't be bind mounted
# because git projects are individually mounted
if current_dir_origin in dirs_with_git_projects:
bindCurrentDir = False
# A directory that contains read-write paths should only
# ever be bind mounted if the directory itself is read-write
if contains_read_write(current_dir_origin) and not allowed_read_write(current_dir_origin):
bindCurrentDir = False
if bindCurrentDir:
# The current dir can be bind mounted wholesale
del subdirs[:]
if allowed_read_write(current_dir_origin):
self._AddBindMount(current_dir_origin, current_dir_destination, False)
else:
self._AddBindMount(current_dir_origin, current_dir_destination, True)
continue
# If we've made it this far then we're going to process
# each file and subdir individually
for subdir in subdirs:
subdir_origin = os.path.join(current_dir_origin, subdir)
# Symbolic links to subdirectories
# have to be copied to the intermediate work directory.
# We can't bind mount them because bind mounts dereference
# symbolic links, and the build system filters out any
# directory symbolic links.
if os.path.islink(subdir_origin):
if subdir_origin not in skip_subdirs:
subdir_destination = os.path.join(intermediate_work_dir,
current_dir_relative, subdir)
self._CopyFile(subdir_origin, subdir_destination)
# bind each file individually then keep traversing
for file in files:
file_origin = os.path.join(current_dir_origin, file)
file_destination = os.path.join(current_dir_destination, file)
if allowed_read_write(file_origin):
self._AddBindMount(file_origin, file_destination, False)
else:
self._AddBindMount(file_origin, file_destination, True)
def _AddArtifactDirectories(self, source_dir, destination_dir, skip_subdirs):
"""Add directories that were not synced as workspace source.
Args:
source_dir: A string with the path to the Android platform source.
destination_dir: A string with the path to the source where the overlays
will be applied.
skip_subdirs: A set of string paths to be skipped from overlays.
Returns:
A list of string paths to be skipped from overlaying.
"""
# Ensure the main out directory exists
main_out_dir = os.path.join(source_dir, 'out')
if not os.path.exists(main_out_dir):
os.makedirs(main_out_dir)
for subdir in os.listdir(source_dir):
if subdir.startswith('out'):
out_origin = os.path.join(source_dir, subdir)
if out_origin in skip_subdirs:
continue
out_destination = os.path.join(destination_dir, subdir)
self._AddBindMount(out_origin, out_destination, False)
skip_subdirs.add(out_origin)
repo_origin = os.path.join(source_dir, '.repo')
if os.path.exists(repo_origin):
repo_destination = os.path.normpath(
os.path.join(destination_dir, '.repo'))
self._AddBindMount(repo_origin, repo_destination, True)
skip_subdirs.add(repo_origin)
return skip_subdirs
def _AddOverlays(self, source_dir, overlay_dirs, destination_dir,
skip_subdirs, allowed_projects, allowed_read_write,
contains_read_write, is_replacement_allowed):
"""Add the selected overlay directories.
Args:
source_dir: A string with the path to the Android platform source.
overlay_dirs: A list of strings with the paths to the overlay
directory to apply.
destination_dir: A string with the path to the source where the overlays
will be applied.
skip_subdirs: A set of string paths to be skipped from overlays.
allowed_projects: If not None, any .git project path not in this list
is excluded from overlaying.
allowed_read_write: A function returns true if the path input should
be allowed read/write access.
contains_read_write: A function returns true if the path input contains
a sub-path that should be allowed read/write access.
is_replacement_allowed: A function returns true if the path can replace a
subsequent path.
"""
# Create empty intermediate workdir
intermediate_work_dir = self._HideDir(destination_dir)
overlay_dirs.append(source_dir)
skip_subdirs = self._AddArtifactDirectories(source_dir, destination_dir,
skip_subdirs)
# Bind mount each overlay directory using a
# depth first traversal algorithm.
#
# The algorithm described works under the condition that the overlaid file
# systems do not have conflicting projects or that the conflict path is
# specifically called-out as a replacement path.
#
# The results of attempting to overlay two git projects on top
# of each other are unpredictable and may push the limits of bind mounts.
skip_subdirs.add(os.path.join(source_dir, 'overlays'))
for overlay_dir in overlay_dirs:
self._AddOverlay(source_dir, overlay_dir, intermediate_work_dir,
skip_subdirs, allowed_projects, destination_dir,
allowed_read_write, contains_read_write,
is_replacement_allowed)
def _AddBindMount(self,
source_dir,
destination_dir,
readonly=False,
allows_replacement=False):
"""Adds a bind mount for the specified directory.
Args:
source_dir: A string with the path of a source directory to bind.
It must already exist.
destination_dir: A string with the path ofa destination
directory to bind the source into. If it does not exist,
it will be created.
readonly: A flag to indicate whether this path should be bind mounted
with read-only access.
allow_replacement: A flag to indicate whether this path is allowed to replace a
conflicting path.
"""
conflict_path, replacement = self._FindBindMountConflict(destination_dir)
if conflict_path and not replacement:
raise ValueError("Project %s could not be overlaid at %s "
"because it conflicts with %s"
% (source_dir, destination_dir, conflict_path))
elif not conflict_path:
if len(self._bind_mounts) >= self.MAX_BIND_MOUNTS:
raise ValueError("Bind mount limit of %s reached" % self.MAX_BIND_MOUNTS)
self._bind_mounts[destination_dir] = BindMount(
source_dir=source_dir,
readonly=readonly,
allows_replacement=allows_replacement)
def _CopyFile(self, source_path, dest_path):
"""Copies a file to the specified destination.
Args:
source_path: A string with the path of a source file to copy. It must
exist.
dest_path: A string with the path to copy the file to. It should not
exist.
"""
dest_dir = os.path.dirname(dest_path)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
subprocess.check_call(['cp', '--no-dereference', source_path, dest_path])
def GetBindMounts(self):
"""Enumerates all bind mounts required by this Overlay.
Returns:
An ordered dict of BindMount objects keyed by destination path string.
The order of the bind mounts does matter, this is why it's an ordered
dict instead of a standard dict.
"""
return self._bind_mounts
def _GetReadWriteFunction(self, build_config, source_dir):
"""Returns a function that tells you how to mount a path.
Args:
build_config: A config.BuildConfig instance of the build target to be
prepared.
source_dir: A string with the path to the Android platform source.
Returns:
A function that takes a string path as an input and returns
True if the path should be mounted read-write or False if
the path should be mounted read-only.
"""
# The read/write allowlist provides paths relative to the source dir. It
# needs to be updated with absolute paths to make lookup possible.
rw_allowlist = {os.path.join(source_dir, p) for p in build_config.allow_readwrite}
def AllowReadWrite(path):
return build_config.allow_readwrite_all or path in rw_allowlist
return AllowReadWrite
def _GetContainsReadWriteFunction(self, build_config, source_dir):
"""Returns a function that tells you if a directory contains a read-write dir
Args:
build_config: A config.BuildConfig instance of the build target to be
prepared.
source_dir: A string with the path to the Android platform source.
Returns:
A function that takes a string path as an input and returns
True if the path contains a read-write path
"""
# Get all dirs with allowed read-write
# and all their ancestor directories
contains_rw = set()
for path in build_config.allow_readwrite:
while path not in ["", "/"]:
# The read/write allowlist provides paths relative to the source dir. It
# needs to be updated with absolute paths to make lookup possible.
contains_rw.add(os.path.join(source_dir, path))
path = os.path.dirname(path)
def ContainsReadWrite(path):
return build_config.allow_readwrite_all or path in contains_rw
return ContainsReadWrite
def _GetAllowedProjects(self, build_config):
"""Returns a set of paths that are allowed to contain .git projects.
Args:
build_config: A config.BuildConfig instance of the build target to be
prepared.
Returns:
If the target has an allowed projects file: a set of paths. Any .git
project path not in this set should be excluded from overlaying.
Otherwise: None
"""
if not build_config.allowed_projects_file:
return None
allowed_projects = ET.parse(build_config.allowed_projects_file)
paths = set()
for child in allowed_projects.getroot().findall("project"):
paths.add(child.attrib.get("path", child.attrib["name"]))
return paths
def _IsReplacementAllowedFunction(self, build_config):
"""Returns a function to determin if a given path is replaceable.
Args:
build_config: A config.BuildConfig instance of the build target to be
prepared.
Returns:
A function that takes an overlay name and string path as input and
returns True if the path is replaceable.
"""
def is_replacement_allowed_func(overlay_name, path):
for overlay in build_config.overlays:
if overlay_name == overlay.name and path in overlay.replacement_paths:
return True
return False
return is_replacement_allowed_func
def __init__(self,
build_target,
source_dir,
cfg,
whiteout_list = [],
destination_dir=None,
quiet=False):
"""Inits Overlay with the details of what is going to be overlaid.
Args:
build_target: A string with the name of the build target to be prepared.
source_dir: A string with the path to the Android platform source.
cfg: A config.Config instance.
whiteout_list: A list of directories to hide from the build system.
destination_dir: A string with the path where the overlay filesystem
will be created. If none is provided, the overlay filesystem
will be applied directly on top of source_dir.
quiet: A boolean that, when True, suppresses debug output.
"""
self._quiet = quiet
if not destination_dir:
destination_dir = source_dir
self._overlay_dirs = None
# The order of the bind mounts does matter, this is why it's an ordered
# dict instead of a standard dict.
self._bind_mounts = collections.OrderedDict()
# We will be repeateadly searching for items to skip so a set
# seems appropriate
skip_subdirs = set(whiteout_list)
build_config = cfg.get_build_config(build_target)
allowed_read_write = self._GetReadWriteFunction(build_config, source_dir)
contains_read_write = self._GetContainsReadWriteFunction(build_config, source_dir)
allowed_projects = self._GetAllowedProjects(build_config)
is_replacement_allowed = self._IsReplacementAllowedFunction(build_config)
overlay_dirs = []
for overlay in build_config.overlays:
overlay_dir = os.path.join(source_dir, 'overlays', overlay.name)
overlay_dirs.append(overlay_dir)
self._AddOverlays(
source_dir, overlay_dirs, destination_dir,
skip_subdirs, allowed_projects, allowed_read_write, contains_read_write,
is_replacement_allowed)
# If specified for this target, create a custom filesystem view
for path_relative_from, path_relative_to in build_config.views:
path_from = os.path.join(source_dir, path_relative_from)
if os.path.isfile(path_from) or os.path.isdir(path_from):
path_to = os.path.join(destination_dir, path_relative_to)
if allowed_read_write(path_from):
self._AddBindMount(path_from, path_to, False)
else:
self._AddBindMount(path_from, path_to, True)
else:
raise ValueError("Path '%s' must be a file or directory" % path_from)
self._overlay_dirs = overlay_dirs
if not self._quiet:
print('Applied overlays ' + ' '.join(self._overlay_dirs))