Revert "Support rolling over a percentage of workflows (#134816)"
This reverts commit fc890b55b51098437b6149abf1026a8b2aaee389.
Reverted https://github.com/pytorch/pytorch/pull/134816 on behalf of https://github.com/malfet due to Causes lint to intermittently fail ([comment](https://github.com/pytorch/pytorch/pull/134816#issuecomment-2332902609))
diff --git a/.github/scripts/runner_determinator.py b/.github/scripts/runner_determinator.py
index c1cc82d..e52b19c 100644
--- a/.github/scripts/runner_determinator.py
+++ b/.github/scripts/runner_determinator.py
@@ -3,94 +3,49 @@
"""
This runner determinator is used to determine which set of runners to run a
GitHub job on. It uses the first comment of a GitHub issue (by default
-https://github.com/pytorch/test-infra/issues/5132) to define the configuration
-of which runners should be used to run which job.
-
-The configuration has two parts, the settings and a list of opted-in users,
-separated by a line containing "---". If the line is not present, the
-settings are considered to be empty with only the second part, the user
-list, defined.
-
-The first part is a YAML block that defines the rollout settings. This can be
-used to define any settings that are needed to determine which runners to use.
-It's fields are defined by the RolloutSettings class below.
-
-The second part is a list of users who are explicitly opted in to the LF fleet.
-The user list is also a comma separated list of additional features or
-experiments which the user could be opted in to.
+https://github.com/pytorch/test-infra/issues/5132) as a user list to determine
+which users will get their jobs to run on experimental runners. This user list
+is also a comma separated list of additional features or experiments which the
+user could be opted in to.
The user list has the following rules:
-- Users are GitHub usernames, which must start with the @ prefix
+- Users are GitHub usernames with the @ prefix
+- If the first line is a "*" then all users will use the new runners
+- If the first line is a "!" then all users will use the old runners
- Each user is also a comma-separated list of features/experiments to enable
-- A "#" prefix opts the user out of all experiments
+- A "#" prefix indicates the user is opted out of the new runners but is opting
+ into features/experiments.
-Example config:
- # A list of experiments that can be opted into.
- # This defines the behavior they'll induce when opted into.
- # Expected syntax is:
- # [experiment_name]: # Name of the experiment. Also used for the label prefix.
- # rollout_perc: [int] # % of workflows to run with this experiment when users are not opted in.
+Example user list:
- experiments:
- lf:
- rollout_percent: 25
-
- ---
-
- # Opt-ins:
- # Users can opt into the LF fleet by adding their GitHub username to this list
- # and specifying experiments to enable in a comma-separated list.
- # Experiments should be from the above list.
-
- @User1,lf,split_build
- @User2,lf
- @User3,split_build
+ @User1
+ @User2,amz2023
+ #@UserOptOutOfNewRunner,amz2023
"""
import logging
import os
-import random
from argparse import ArgumentParser
from logging import LogRecord
-from typing import Any, Dict, Iterable, List, NamedTuple, Tuple
+from typing import Any, Iterable
-import yaml
from github import Auth, Github
from github.Issue import Issue
-DEFAULT_LABEL_PREFIX = "" # use meta runners
+WORKFLOW_LABEL_META = "" # use meta runners
WORKFLOW_LABEL_LF = "lf." # use runners from the linux foundation
WORKFLOW_LABEL_LF_CANARY = "lf.c." # use canary runners from the linux foundation
+RUNNER_AMI_LEGACY = ""
+RUNNER_AMI_AMZ2023 = "amz2023"
+
GITHUB_OUTPUT = os.getenv("GITHUB_OUTPUT", "")
GH_OUTPUT_KEY_AMI = "runner-ami"
GH_OUTPUT_KEY_LABEL_TYPE = "label-type"
-SETTING_EXPERIMENTS = "experiments"
-
-LF_FLEET_EXPERIMENT = "lf"
-CANARY_FLEET_SUFFIX = ".c"
-
-
-class Experiment(NamedTuple):
- rollout_perc: int = (
- 0 # Percentage of workflows to experiment on when user is not opted-in.
- )
-
- # Add more fields as needed
-
-
-class Settings(NamedTuple):
- """
- Settings for the experiments that can be opted into.
- """
-
- experiments: Dict[str, Experiment] = {}
-
-
class ColorFormatter(logging.Formatter):
"""Color codes the log messages based on the log level"""
@@ -217,181 +172,85 @@
return branch.split("/")[0] in {"main", "nightly", "release", "landchecks"}
-def load_yaml(yaml_text: str) -> Any:
+def get_fleet(rollout_state: str, workflow_requestors: Iterable[str]) -> str:
+ """
+ Determines if the job should run on the LF fleet or the Meta fleet
+
+ Returns:
+ The appropriate label prefix for the runner, corresponding to the fleet to use.
+ This gets prefixed to the very start of the runner label.
+ """
+
try:
- data = yaml.safe_load(yaml_text)
- return data
- except yaml.YAMLError as exc:
- log.exception("Error loading YAML")
- raise
-
-
-def extract_settings_user_opt_in_from_text(rollout_state: str) -> Tuple[str, str]:
- """
- Extracts the text with settings, if any, and the opted in users from the rollout state.
-
- If the issue body contains "---" then the text above that is the settings
- and the text below is the list of opted in users.
-
- If it doesn't contain "---" then the settings are empty and the rest is the users.
- """
- rollout_state_parts = rollout_state.split("---")
- if len(rollout_state_parts) >= 2:
- return rollout_state_parts[0], rollout_state_parts[1]
- else:
- return "", rollout_state
-
-
-class UserOptins(Dict[str, List[str]]):
- """
- Dictionary of users with a list of features they have opted into
- """
-
-
-def parse_user_opt_in_from_text(user_optin_text: str) -> UserOptins:
- """
- Parse the user opt-in text into a key value pair of username and the list of features they have opted into
-
- Users are GitHub usernames with the @ prefix. Each user is also a comma-separated list of features/experiments to enable.
- - Example line: "@User1,lf,split_build"
- - A "#" prefix indicates the user is opted out of all experiments
-
-
- """
- optins = UserOptins()
- for user in user_optin_text.split("\n"):
- user = user.strip("\r\n\t -")
- if not user or not user.startswith("@"):
- # Not a valid user. Skip
- continue
-
- if user:
- usr_name = user.split(",")[0].strip("@")
- optins[usr_name] = [exp.strip(" ") for exp in user.split(",")[1:]]
-
- return optins
-
-
-def parse_settings_from_text(settings_text: str) -> Settings:
- """
- Parse the experiments from the issue body into a list of ExperimentSettings
- """
- try:
- if settings_text:
- # Escape the backtick as well so that we can have the settings in a code block on the GH issue
- # for easy reading
- # Note: Using ascii for the backtick so that the cat step in _runner-determinator.yml doesn't choke on
- # the backtick character in shell commands.
- backtick = chr(96) # backtick character
- settings_text = settings_text.strip(f"\r\n\t{backtick} ")
- settings = load_yaml(settings_text)
-
- # For now we just load experiments. We can expand this if/when we add more settings
- experiments = {}
-
- for exp_name, exp_settings in settings.get(SETTING_EXPERIMENTS).items():
- valid_settings = {}
- for setting in exp_settings:
- if setting not in Experiment._fields:
- log.warning(
- f"Unexpected setting in experiment: {setting} = {exp_settings[setting]}"
- )
- else:
- valid_settings[setting] = exp_settings[setting]
-
- experiments[exp_name] = Experiment(**valid_settings)
- return Settings(experiments)
+ if rollout_state[0] == "!":
+ log.info("LF Workflows are disabled for everyone. Using meta runners.")
+ return WORKFLOW_LABEL_META
+ elif rollout_state[0] == "*":
+ log.info("LF Workflows are enabled for everyone. Using LF runners.")
+ return WORKFLOW_LABEL_LF
+ else:
+ all_opted_in_users = {
+ usr_raw.strip("\n\t@ ").split(",")[0]
+ for usr_raw in rollout_state.split()
+ }
+ opted_in_requestors = {
+ usr for usr in workflow_requestors if usr in all_opted_in_users
+ }
+ if opted_in_requestors:
+ log.info(
+ f"LF Workflows are enabled for {', '.join(opted_in_requestors)}. Using LF runners."
+ )
+ return WORKFLOW_LABEL_LF
+ else:
+ log.info(
+ f"LF Workflows are disabled for {', '.join(workflow_requestors)}. Using meta runners."
+ )
+ return WORKFLOW_LABEL_META
except Exception as e:
- log.exception("Failed to parse settings")
-
- return Settings()
-
-
-def parse_settings(rollout_state: str) -> Settings:
- """
- Parse settings, if any, from the rollout state.
-
- If the issue body contains "---" then the text above that is the settings
- and the text below is the list of opted in users.
-
- If it doesn't contain "---" then the settings are empty and the default values are used.
- """
- settings_text, _ = extract_settings_user_opt_in_from_text(rollout_state)
- return parse_settings_from_text(settings_text)
-
-
-def parse_users(rollout_state: str) -> UserOptins:
- """
- Parse users from the rollout state.
-
- """
- _, users_text = extract_settings_user_opt_in_from_text(rollout_state)
- return parse_user_opt_in_from_text(users_text)
-
-
-def is_user_opted_in(user: str, user_optins: UserOptins, experiment_name: str) -> bool:
- """
- Check if a user is opted into an experiment
- """
- return experiment_name in user_optins.get(user, [])
-
-
-def get_runner_prefix(
- rollout_state: str, workflow_requestors: Iterable[str], is_canary: bool = False
-) -> str:
- settings = parse_settings(rollout_state)
- user_optins = parse_users(rollout_state)
-
- fleet_prefix = ""
- prefixes = []
- for experiment_name, experiment_settings in settings.experiments.items():
- enabled = False
-
- # Is any workflow_requestor opted in to this experiment?
- opted_in_users = [
- requestor
- for requestor in workflow_requestors
- if is_user_opted_in(requestor, user_optins, experiment_name)
- ]
-
- if opted_in_users:
- log.info(
- f"{', '.join(opted_in_users)} have opted into experiment {experiment_name}."
- )
- enabled = True
- else:
- # If no user is opted in, then we randomly enable the experiment based on the rollout percentage
- r = random.randint(1, 100)
- if r <= experiment_settings.rollout_perc:
- log.info(
- f"Based on rollout percentage of {experiment_settings.rollout_perc}%, enabling experiment {experiment_name}."
- )
- enabled = True
-
- if enabled:
- label = experiment_name
- if experiment_name == LF_FLEET_EXPERIMENT:
- # We give some special treatment to the "lf" experiment since determines the fleet we use
- # - If it's enabled, then we always list it's prefix first
- # - If we're in the canary branch, then we append ".c" to the lf prefix
- if is_canary:
- label += CANARY_FLEET_SUFFIX
- fleet_prefix = label
- else:
- prefixes.append(label)
-
- if len(prefixes) > 1:
log.error(
- f"Only a fleet and one other experiment can be enabled for a job at any time. Enabling {prefixes[0]} and ignoring the rest, which are {', '.join(prefixes[1:])}"
+ f"Failed to get determine workflow type. Falling back to meta runners. Exception: {e}"
)
- prefixes = prefixes[:1]
+ return WORKFLOW_LABEL_META
- # Fleet always comes first
- if fleet_prefix:
- prefixes.insert(0, fleet_prefix)
- return ".".join(prefixes) + "." if prefixes else ""
+def get_optin_feature(
+ rollout_state: str, workflow_requestors: Iterable[str], feature: str, fallback: str
+) -> str:
+ """
+ Used to dynamically opt in jobs to specific runner-type variants.
+
+ Returns:
+ The runner-type's variant name if the user has opted in to the feature, otherwise returns an empty string.
+ This variant name is prefixed to the runner-type in the label.
+ """
+ try:
+ userlist = {u.lstrip("#").strip("\n\t@ ") for u in rollout_state.split()}
+ all_opted_in_users = set()
+ for user in userlist:
+ for i in user.split(","):
+ if i == feature:
+ all_opted_in_users.add(user.split(",")[0])
+ opted_in_requestors = {
+ usr for usr in workflow_requestors if usr in all_opted_in_users
+ }
+
+ if opted_in_requestors:
+ log.info(
+ f"Feature {feature} is enabled for {', '.join(opted_in_requestors)}. Using feature {feature}."
+ )
+ return feature
+ else:
+ log.info(
+ f"Feature {feature} is disabled for {', '.join(workflow_requestors)}. Using fallback \"{fallback}\"."
+ )
+ return fallback
+
+ except Exception as e:
+ log.error(
+ f'Failed to determine if user has opted-in to feature {feature}. Using fallback "{fallback}". Exception: {e}'
+ )
+ return fallback
def get_rollout_state_from_issue(github_token: str, repo: str, issue_num: int) -> str:
@@ -409,10 +268,9 @@
args = parse_args()
if args.github_ref_type == "branch" and is_exception_branch(args.github_branch):
- log.info(
- f"Exception branch: '{args.github_branch}', using Meta runners and no experiments."
- )
- runner_label_prefix = DEFAULT_LABEL_PREFIX
+ log.info(f"Exception branch: '{args.github_branch}', using meta runners")
+ label_type = WORKFLOW_LABEL_META
+ runner_ami = RUNNER_AMI_LEGACY
else:
try:
rollout_state = get_rollout_state_from_issue(
@@ -427,18 +285,35 @@
args.github_branch,
)
- is_canary = args.github_repo == "pytorch/pytorch-canary"
-
- runner_label_prefix = get_runner_prefix(
- rollout_state, (args.github_issue_owner, username), is_canary
+ label_type = get_fleet(
+ rollout_state,
+ (
+ args.github_issue_owner,
+ username,
+ ),
)
-
+ runner_ami = get_optin_feature(
+ rollout_state=rollout_state,
+ workflow_requestors=(
+ args.github_issue_owner,
+ username,
+ ),
+ feature=RUNNER_AMI_AMZ2023,
+ fallback=RUNNER_AMI_LEGACY,
+ )
except Exception as e:
log.error(
- f"Failed to get issue. Defaulting to Meta runners and no experiments. Exception: {e}"
+ f"Failed to get issue. Falling back to meta runners. Exception: {e}"
)
+ label_type = WORKFLOW_LABEL_META
+ runner_ami = RUNNER_AMI_LEGACY
- set_github_output(GH_OUTPUT_KEY_LABEL_TYPE, runner_label_prefix)
+ # For Canary builds use canary runners
+ if args.github_repo == "pytorch/pytorch-canary" and label_type == WORKFLOW_LABEL_LF:
+ label_type = WORKFLOW_LABEL_LF_CANARY
+
+ set_github_output(GH_OUTPUT_KEY_LABEL_TYPE, label_type)
+ set_github_output(GH_OUTPUT_KEY_AMI, runner_ami)
if __name__ == "__main__":
diff --git a/.github/scripts/test_runner_determinator.py b/.github/scripts/test_runner_determinator.py
deleted file mode 100644
index 98063b1..0000000
--- a/.github/scripts/test_runner_determinator.py
+++ /dev/null
@@ -1,237 +0,0 @@
-from unittest import main, TestCase
-from unittest.mock import Mock, patch
-
-import runner_determinator as rd
-
-
-class TestRunnerDeterminatorIssueParser(TestCase):
- def test_parse_settings(self) -> None:
- settings_text = """
- experiments:
- lf:
- rollout_perc: 25
- otherExp:
- rollout_perc: 0
- ---
-
- Users:
- @User1,lf
- @User2,lf,otherExp
-
- """
-
- settings = rd.parse_settings(settings_text)
-
- self.assertTupleEqual(
- rd.Experiment(rollout_perc=25),
- settings.experiments["lf"],
- "lf settings not parsed correctly",
- )
- self.assertTupleEqual(
- rd.Experiment(rollout_perc=0),
- settings.experiments["otherExp"],
- "otherExp settings not parsed correctly",
- )
-
- def test_parse_settings_in_code_block(self) -> None:
- settings_text = """
-
- ```
- experiments:
- lf:
- rollout_perc: 25
- otherExp:
- rollout_perc: 0
-
- ```
-
- ---
-
- Users:
- @User1,lf
- @User2,lf,otherExp
-
- """
-
- settings = rd.parse_settings(settings_text)
-
- self.assertTupleEqual(
- rd.Experiment(rollout_perc=25),
- settings.experiments["lf"],
- "lf settings not parsed correctly",
- )
- self.assertTupleEqual(
- rd.Experiment(rollout_perc=0),
- settings.experiments["otherExp"],
- "otherExp settings not parsed correctly",
- )
-
- def test_parse_users(self) -> None:
- settings_text = """
- experiments:
- lf:
- rollout_perc: 25
- otherExp:
- rollout_perc: 0
- ---
-
- Users:
- @User1,lf
- @User2,lf,otherExp
-
- """
-
- users = rd.parse_users(settings_text)
- self.assertDictEqual(
- {"User1": ["lf"], "User2": ["lf", "otherExp"]},
- users,
- "Users not parsed correctly",
- )
-
- def test_parse_users_without_settings(self) -> None:
- settings_text = """
-
- @User1,lf
- @User2,lf,otherExp
-
- """
-
- users = rd.parse_users(settings_text)
- self.assertDictEqual(
- {"User1": ["lf"], "User2": ["lf", "otherExp"]},
- users,
- "Users not parsed correctly",
- )
-
-
-class TestRunnerDeterminatorGetRunnerPrefix(TestCase):
- def test_opted_in_user(self) -> None:
- settings_text = """
- experiments:
- lf:
- rollout_perc: 25
- otherExp:
- rollout_perc: 25
- ---
-
- Users:
- @User1,lf
- @User2,lf,otherExp
-
- """
- prefix = rd.get_runner_prefix(settings_text, ["User1"])
- self.assertEqual("lf.", prefix, "Runner prefix not correct for User1")
-
- def test_opted_in_user_two_experiments(self) -> None:
- settings_text = """
- experiments:
- lf:
- rollout_perc: 25
- otherExp:
- rollout_perc: 25
- ---
-
- Users:
- @User1,lf
- @User2,lf,otherExp
-
- """
- prefix = rd.get_runner_prefix(settings_text, ["User2"])
- self.assertEqual("lf.otherExp.", prefix, "Runner prefix not correct for User1")
-
- @patch("random.randint", return_value=50)
- def test_opted_out_user(self, mock_randint: Mock) -> None:
- settings_text = """
- experiments:
- lf:
- rollout_perc: 25
- otherExp:
- rollout_perc: 25
- ---
-
- Users:
- @User1,lf
- @User2,lf,otherExp
-
- """
- prefix = rd.get_runner_prefix(settings_text, ["User3"])
- self.assertEqual("", prefix, "Runner prefix not correct for user")
-
- @patch("random.randint", return_value=10)
- def test_opted_out_user_was_pulled_in_by_rollout(self, mock_randint: Mock) -> None:
- settings_text = """
- experiments:
- lf:
- rollout_perc: 25
- otherExp:
- rollout_perc: 25
- ---
-
- Users:
- @User1,lf
- @User2,lf,otherExp
-
- """
-
- # User3 is opted out, but is pulled into both experiments by the 10% rollout
- prefix = rd.get_runner_prefix(settings_text, ["User3"])
- self.assertEqual("lf.otherExp.", prefix, "Runner prefix not correct for user")
-
- def test_lf_prefix_always_comes_first(self) -> None:
- settings_text = """
- experiments:
- otherExp:
- rollout_perc: 0
- lf:
- rollout_perc: 0
- ---
-
- Users:
- @User1,lf
- @User2,otherExp,lf
-
- """
-
- prefix = rd.get_runner_prefix(settings_text, ["User2"])
- self.assertEqual("lf.otherExp.", prefix, "Runner prefix not correct for user")
-
- def test_ignores_commented_users(self) -> None:
- settings_text = """
- experiments:
- lf:
- rollout_perc: 0
- otherExp:
- rollout_perc: 0
- ---
-
- Users:
- #@User1,lf
- @User2,lf,otherExp
-
- """
-
- prefix = rd.get_runner_prefix(settings_text, ["User1"])
- self.assertEqual("", prefix, "Runner prefix not correct for user")
-
- def test_ignores_extra_experiments(self) -> None:
- settings_text = """
- experiments:
- lf:
- rollout_perc: 0
- otherExp:
- rollout_perc: 0
- foo:
- rollout_perc: 0
- ---
-
- Users:
- @User1,lf,otherExp,foo
-
- """
-
- prefix = rd.get_runner_prefix(settings_text, ["User1"])
- self.assertEqual("lf.otherExp.", prefix, "Runner prefix not correct for user")
-
-
-if __name__ == "__main__":
- main()
diff --git a/.github/workflows/_runner-determinator.yml b/.github/workflows/_runner-determinator.yml
index 5edb79d..8ba7093 100644
--- a/.github/workflows/_runner-determinator.yml
+++ b/.github/workflows/_runner-determinator.yml
@@ -62,94 +62,49 @@
"""
This runner determinator is used to determine which set of runners to run a
GitHub job on. It uses the first comment of a GitHub issue (by default
- https://github.com/pytorch/test-infra/issues/5132) to define the configuration
- of which runners should be used to run which job.
-
- The configuration has two parts, the settings and a list of opted-in users,
- separated by a line containing "---". If the line is not present, the
- settings are considered to be empty with only the second part, the user
- list, defined.
-
- The first part is a YAML block that defines the rollout settings. This can be
- used to define any settings that are needed to determine which runners to use.
- It's fields are defined by the RolloutSettings class below.
-
- The second part is a list of users who are explicitly opted in to the LF fleet.
- The user list is also a comma separated list of additional features or
- experiments which the user could be opted in to.
+ https://github.com/pytorch/test-infra/issues/5132) as a user list to determine
+ which users will get their jobs to run on experimental runners. This user list
+ is also a comma separated list of additional features or experiments which the
+ user could be opted in to.
The user list has the following rules:
- - Users are GitHub usernames, which must start with the @ prefix
+ - Users are GitHub usernames with the @ prefix
+ - If the first line is a "*" then all users will use the new runners
+ - If the first line is a "!" then all users will use the old runners
- Each user is also a comma-separated list of features/experiments to enable
- - A "#" prefix opts the user out of all experiments
+ - A "#" prefix indicates the user is opted out of the new runners but is opting
+ into features/experiments.
- Example config:
- # A list of experiments that can be opted into.
- # This defines the behavior they'll induce when opted into.
- # Expected syntax is:
- # [experiment_name]: # Name of the experiment. Also used for the label prefix.
- # rollout_perc: [int] # % of workflows to run with this experiment when users are not opted in.
+ Example user list:
- experiments:
- lf:
- rollout_percent: 25
-
- ---
-
- # Opt-ins:
- # Users can opt into the LF fleet by adding their GitHub username to this list
- # and specifying experiments to enable in a comma-separated list.
- # Experiments should be from the above list.
-
- @User1,lf,split_build
- @User2,lf
- @User3,split_build
+ @User1
+ @User2,amz2023
+ #@UserOptOutOfNewRunner,amz2023
"""
import logging
import os
- import random
from argparse import ArgumentParser
from logging import LogRecord
- from typing import Any, Dict, Iterable, List, NamedTuple, Tuple
+ from typing import Any, Iterable
- import yaml
from github import Auth, Github
from github.Issue import Issue
- DEFAULT_LABEL_PREFIX = "" # use meta runners
+ WORKFLOW_LABEL_META = "" # use meta runners
WORKFLOW_LABEL_LF = "lf." # use runners from the linux foundation
WORKFLOW_LABEL_LF_CANARY = "lf.c." # use canary runners from the linux foundation
+ RUNNER_AMI_LEGACY = ""
+ RUNNER_AMI_AMZ2023 = "amz2023"
+
GITHUB_OUTPUT = os.getenv("GITHUB_OUTPUT", "")
GH_OUTPUT_KEY_AMI = "runner-ami"
GH_OUTPUT_KEY_LABEL_TYPE = "label-type"
- SETTING_EXPERIMENTS = "experiments"
-
- LF_FLEET_EXPERIMENT = "lf"
- CANARY_FLEET_SUFFIX = ".c"
-
-
- class Experiment(NamedTuple):
- rollout_perc: int = (
- 0 # Percentage of workflows to experiment on when user is not opted-in.
- )
-
- # Add more fields as needed
-
-
- class Settings(NamedTuple):
- """
- Settings for the experiments that can be opted into.
- """
-
- experiments: Dict[str, Experiment] = {}
-
-
class ColorFormatter(logging.Formatter):
"""Color codes the log messages based on the log level"""
@@ -276,181 +231,85 @@
return branch.split("/")[0] in {"main", "nightly", "release", "landchecks"}
- def load_yaml(yaml_text: str) -> Any:
+ def get_fleet(rollout_state: str, workflow_requestors: Iterable[str]) -> str:
+ """
+ Determines if the job should run on the LF fleet or the Meta fleet
+
+ Returns:
+ The appropriate label prefix for the runner, corresponding to the fleet to use.
+ This gets prefixed to the very start of the runner label.
+ """
+
try:
- data = yaml.safe_load(yaml_text)
- return data
- except yaml.YAMLError as exc:
- log.exception("Error loading YAML")
- raise
-
-
- def extract_settings_user_opt_in_from_text(rollout_state: str) -> Tuple[str, str]:
- """
- Extracts the text with settings, if any, and the opted in users from the rollout state.
-
- If the issue body contains "---" then the text above that is the settings
- and the text below is the list of opted in users.
-
- If it doesn't contain "---" then the settings are empty and the rest is the users.
- """
- rollout_state_parts = rollout_state.split("---")
- if len(rollout_state_parts) >= 2:
- return rollout_state_parts[0], rollout_state_parts[1]
- else:
- return "", rollout_state
-
-
- class UserOptins(Dict[str, List[str]]):
- """
- Dictionary of users with a list of features they have opted into
- """
-
-
- def parse_user_opt_in_from_text(user_optin_text: str) -> UserOptins:
- """
- Parse the user opt-in text into a key value pair of username and the list of features they have opted into
-
- Users are GitHub usernames with the @ prefix. Each user is also a comma-separated list of features/experiments to enable.
- - Example line: "@User1,lf,split_build"
- - A "#" prefix indicates the user is opted out of all experiments
-
-
- """
- optins = UserOptins()
- for user in user_optin_text.split("\n"):
- user = user.strip("\r\n\t -")
- if not user or not user.startswith("@"):
- # Not a valid user. Skip
- continue
-
- if user:
- usr_name = user.split(",")[0].strip("@")
- optins[usr_name] = [exp.strip(" ") for exp in user.split(",")[1:]]
-
- return optins
-
-
- def parse_settings_from_text(settings_text: str) -> Settings:
- """
- Parse the experiments from the issue body into a list of ExperimentSettings
- """
- try:
- if settings_text:
- # Escape the backtick as well so that we can have the settings in a code block on the GH issue
- # for easy reading
- # Note: Using ascii for the backtick so that the cat step in _runner-determinator.yml doesn't choke on
- # the backtick character in shell commands.
- backtick = chr(96) # backtick character
- settings_text = settings_text.strip(f"\r\n\t{backtick} ")
- settings = load_yaml(settings_text)
-
- # For now we just load experiments. We can expand this if/when we add more settings
- experiments = {}
-
- for exp_name, exp_settings in settings.get(SETTING_EXPERIMENTS).items():
- valid_settings = {}
- for setting in exp_settings:
- if setting not in Experiment._fields:
- log.warning(
- f"Unexpected setting in experiment: {setting} = {exp_settings[setting]}"
- )
- else:
- valid_settings[setting] = exp_settings[setting]
-
- experiments[exp_name] = Experiment(**valid_settings)
- return Settings(experiments)
+ if rollout_state[0] == "!":
+ log.info("LF Workflows are disabled for everyone. Using meta runners.")
+ return WORKFLOW_LABEL_META
+ elif rollout_state[0] == "*":
+ log.info("LF Workflows are enabled for everyone. Using LF runners.")
+ return WORKFLOW_LABEL_LF
+ else:
+ all_opted_in_users = {
+ usr_raw.strip("\n\t@ ").split(",")[0]
+ for usr_raw in rollout_state.split()
+ }
+ opted_in_requestors = {
+ usr for usr in workflow_requestors if usr in all_opted_in_users
+ }
+ if opted_in_requestors:
+ log.info(
+ f"LF Workflows are enabled for {', '.join(opted_in_requestors)}. Using LF runners."
+ )
+ return WORKFLOW_LABEL_LF
+ else:
+ log.info(
+ f"LF Workflows are disabled for {', '.join(workflow_requestors)}. Using meta runners."
+ )
+ return WORKFLOW_LABEL_META
except Exception as e:
- log.exception("Failed to parse settings")
-
- return Settings()
-
-
- def parse_settings(rollout_state: str) -> Settings:
- """
- Parse settings, if any, from the rollout state.
-
- If the issue body contains "---" then the text above that is the settings
- and the text below is the list of opted in users.
-
- If it doesn't contain "---" then the settings are empty and the default values are used.
- """
- settings_text, _ = extract_settings_user_opt_in_from_text(rollout_state)
- return parse_settings_from_text(settings_text)
-
-
- def parse_users(rollout_state: str) -> UserOptins:
- """
- Parse users from the rollout state.
-
- """
- _, users_text = extract_settings_user_opt_in_from_text(rollout_state)
- return parse_user_opt_in_from_text(users_text)
-
-
- def is_user_opted_in(user: str, user_optins: UserOptins, experiment_name: str) -> bool:
- """
- Check if a user is opted into an experiment
- """
- return experiment_name in user_optins.get(user, [])
-
-
- def get_runner_prefix(
- rollout_state: str, workflow_requestors: Iterable[str], is_canary: bool = False
- ) -> str:
- settings = parse_settings(rollout_state)
- user_optins = parse_users(rollout_state)
-
- fleet_prefix = ""
- prefixes = []
- for experiment_name, experiment_settings in settings.experiments.items():
- enabled = False
-
- # Is any workflow_requestor opted in to this experiment?
- opted_in_users = [
- requestor
- for requestor in workflow_requestors
- if is_user_opted_in(requestor, user_optins, experiment_name)
- ]
-
- if opted_in_users:
- log.info(
- f"{', '.join(opted_in_users)} have opted into experiment {experiment_name}."
- )
- enabled = True
- else:
- # If no user is opted in, then we randomly enable the experiment based on the rollout percentage
- r = random.randint(1, 100)
- if r <= experiment_settings.rollout_perc:
- log.info(
- f"Based on rollout percentage of {experiment_settings.rollout_perc}%, enabling experiment {experiment_name}."
- )
- enabled = True
-
- if enabled:
- label = experiment_name
- if experiment_name == LF_FLEET_EXPERIMENT:
- # We give some special treatment to the "lf" experiment since determines the fleet we use
- # - If it's enabled, then we always list it's prefix first
- # - If we're in the canary branch, then we append ".c" to the lf prefix
- if is_canary:
- label += CANARY_FLEET_SUFFIX
- fleet_prefix = label
- else:
- prefixes.append(label)
-
- if len(prefixes) > 1:
log.error(
- f"Only a fleet and one other experiment can be enabled for a job at any time. Enabling {prefixes[0]} and ignoring the rest, which are {', '.join(prefixes[1:])}"
+ f"Failed to get determine workflow type. Falling back to meta runners. Exception: {e}"
)
- prefixes = prefixes[:1]
+ return WORKFLOW_LABEL_META
- # Fleet always comes first
- if fleet_prefix:
- prefixes.insert(0, fleet_prefix)
- return ".".join(prefixes) + "." if prefixes else ""
+ def get_optin_feature(
+ rollout_state: str, workflow_requestors: Iterable[str], feature: str, fallback: str
+ ) -> str:
+ """
+ Used to dynamically opt in jobs to specific runner-type variants.
+
+ Returns:
+ The runner-type's variant name if the user has opted in to the feature, otherwise returns an empty string.
+ This variant name is prefixed to the runner-type in the label.
+ """
+ try:
+ userlist = {u.lstrip("#").strip("\n\t@ ") for u in rollout_state.split()}
+ all_opted_in_users = set()
+ for user in userlist:
+ for i in user.split(","):
+ if i == feature:
+ all_opted_in_users.add(user.split(",")[0])
+ opted_in_requestors = {
+ usr for usr in workflow_requestors if usr in all_opted_in_users
+ }
+
+ if opted_in_requestors:
+ log.info(
+ f"Feature {feature} is enabled for {', '.join(opted_in_requestors)}. Using feature {feature}."
+ )
+ return feature
+ else:
+ log.info(
+ f"Feature {feature} is disabled for {', '.join(workflow_requestors)}. Using fallback \"{fallback}\"."
+ )
+ return fallback
+
+ except Exception as e:
+ log.error(
+ f'Failed to determine if user has opted-in to feature {feature}. Using fallback "{fallback}". Exception: {e}'
+ )
+ return fallback
def get_rollout_state_from_issue(github_token: str, repo: str, issue_num: int) -> str:
@@ -468,10 +327,9 @@
args = parse_args()
if args.github_ref_type == "branch" and is_exception_branch(args.github_branch):
- log.info(
- f"Exception branch: '{args.github_branch}', using Meta runners and no experiments."
- )
- runner_label_prefix = DEFAULT_LABEL_PREFIX
+ log.info(f"Exception branch: '{args.github_branch}', using meta runners")
+ label_type = WORKFLOW_LABEL_META
+ runner_ami = RUNNER_AMI_LEGACY
else:
try:
rollout_state = get_rollout_state_from_issue(
@@ -486,18 +344,35 @@
args.github_branch,
)
- is_canary = args.github_repo == "pytorch/pytorch-canary"
-
- runner_label_prefix = get_runner_prefix(
- rollout_state, (args.github_issue_owner, username), is_canary
+ label_type = get_fleet(
+ rollout_state,
+ (
+ args.github_issue_owner,
+ username,
+ ),
)
-
+ runner_ami = get_optin_feature(
+ rollout_state=rollout_state,
+ workflow_requestors=(
+ args.github_issue_owner,
+ username,
+ ),
+ feature=RUNNER_AMI_AMZ2023,
+ fallback=RUNNER_AMI_LEGACY,
+ )
except Exception as e:
log.error(
- f"Failed to get issue. Defaulting to Meta runners and no experiments. Exception: {e}"
+ f"Failed to get issue. Falling back to meta runners. Exception: {e}"
)
+ label_type = WORKFLOW_LABEL_META
+ runner_ami = RUNNER_AMI_LEGACY
- set_github_output(GH_OUTPUT_KEY_LABEL_TYPE, runner_label_prefix)
+ # For Canary builds use canary runners
+ if args.github_repo == "pytorch/pytorch-canary" and label_type == WORKFLOW_LABEL_LF:
+ label_type = WORKFLOW_LABEL_LF_CANARY
+
+ set_github_output(GH_OUTPUT_KEY_LABEL_TYPE, label_type)
+ set_github_output(GH_OUTPUT_KEY_AMI, runner_ami)
if __name__ == "__main__":