blob: 6a11659868479d91ec47bc8f747b4fa7ea48b048 [file] [log] [blame]
#!/usr/bin/env python3
# encoding: utf-8
import os
import sys
import shlex
import argparse
from subprocess import Popen, PIPE
def run_proc(args, shell = False, input = None):
"""A simple wrapper around Popen, returning (rc, stdout, stderr)"""
process = Popen(args, text = True, shell = shell,
stdin = PIPE, stdout = PIPE, stderr = PIPE)
output, error = process.communicate(input)
return (process.returncode, output, error)
keywords = ("iptables-translate", "ip6tables-translate", "ebtables-translate")
xtables_nft_multi = 'xtables-nft-multi'
if sys.stdout.isatty():
colors = {"magenta": "\033[95m", "green": "\033[92m", "yellow": "\033[93m",
"red": "\033[91m", "end": "\033[0m"}
else:
colors = {"magenta": "", "green": "", "yellow": "", "red": "", "end": ""}
def magenta(string):
return colors["magenta"] + string + colors["end"]
def red(string):
return colors["red"] + string + colors["end"]
def yellow(string):
return colors["yellow"] + string + colors["end"]
def green(string):
return colors["green"] + string + colors["end"]
def test_one_xlate(name, sourceline, expected, result):
rc, output, error = run_proc([xtables_nft_multi] + shlex.split(sourceline))
if rc != 0:
result.append(name + ": " + red("Error: ") + "iptables-translate failure")
result.append(error)
return False
translation = output.rstrip(" \n")
if translation != expected:
result.append(name + ": " + red("Fail"))
result.append(magenta("src: ") + sourceline.rstrip(" \n"))
result.append(magenta("exp: ") + expected)
result.append(magenta("res: ") + translation + "\n")
return False
return True
def test_one_replay(name, sourceline, expected, result):
global args
searchline = None
if sourceline.find(';') >= 0:
sourceline, searchline = sourceline.split(';')
srcwords = shlex.split(sourceline)
srccmd = srcwords[0]
ipt = srccmd.split('-')[0]
table_idx = -1
chain_idx = -1
table_name = "filter"
chain_name = None
for idx in range(1, len(srcwords)):
if srcwords[idx] in ["-A", "-I", "--append", "--insert"]:
chain_idx = idx
chain_name = srcwords[idx + 1]
elif srcwords[idx] in ["-t", "--table"]:
table_idx = idx
table_name = srcwords[idx + 1]
if not chain_name:
return True # nothing to do?
if searchline is None:
# adjust sourceline as required
checkcmd = srcwords[:]
checkcmd[0] = ipt
checkcmd[chain_idx] = "--check"
else:
checkcmd = [ipt, "-t", table_name]
checkcmd += ["--check", chain_name, searchline]
fam = ""
if srccmd.startswith("ip6"):
fam = "ip6 "
elif srccmd.startswith("ebt"):
fam = "bridge "
expected = [ l.removeprefix("nft ").strip(" '") for l in expected.split("\n") ]
nft_input = [
"flush ruleset",
"add table " + fam + table_name,
"add chain " + fam + table_name + " " + chain_name,
] + expected
rc, output, error = run_proc([args.nft, "-f", "-"], shell = False, input = "\n".join(nft_input))
if rc != 0:
result.append(name + ": " + red("Replay Fail"))
result.append(args.nft + " call failed: " + error.rstrip('\n'))
for line in nft_input:
result.append(magenta("input: ") + line)
return False
rc, output, error = run_proc([xtables_nft_multi] + checkcmd)
if rc != 0:
result.append(name + ": " + red("Check Fail"))
result.append(magenta("check: ") + " ".join(checkcmd))
result.append(magenta("error: ") + error)
rc, output, error = run_proc([xtables_nft_multi, ipt + "-save"])
for l in output.split("\n"):
result.append(magenta("ipt: ") + l)
rc, output, error = run_proc([args.nft, "list", "ruleset"])
for l in output.split("\n"):
result.append(magenta("nft: ") + l)
return False
return True
def run_test(name, payload):
global xtables_nft_multi
global args
test_passed = True
tests = passed = failed = errors = 0
result = []
line = payload.readline()
while line:
if not line.startswith(keywords):
line = payload.readline()
continue
sourceline = replayline = line.rstrip("\n")
if line.find(';') >= 0:
sourceline = line.split(';')[0]
expected = payload.readline().rstrip(" \n")
next_expected = payload.readline()
if next_expected.startswith("nft"):
expected += "\n" + next_expected.rstrip(" \n")
line = payload.readline()
else:
line = next_expected
tests += 1
if test_one_xlate(name, sourceline, expected, result):
passed += 1
else:
errors += 1
test_passed = False
continue
if args.replay:
tests += 1
if test_one_replay(name, replayline, expected, result):
passed += 1
else:
errors += 1
test_passed = False
rc, output, error = run_proc([args.nft, "flush", "ruleset"])
if rc != 0:
result.append(name + ": " + red("Fail"))
result.append("nft flush ruleset call failed: " + error)
if (passed == tests):
print(name + ": " + green("OK"))
if not test_passed:
print("\n".join(result), file=sys.stderr)
return tests, passed, failed, errors
def load_test_files():
test_files = total_tests = total_passed = total_error = total_failed = 0
tests = sorted(os.listdir("extensions"))
for test in ['extensions/' + f for f in tests if f.endswith(".txlate")]:
with open(test, "r") as payload:
tests, passed, failed, errors = run_test(test, payload)
test_files += 1
total_tests += tests
total_passed += passed
total_failed += failed
total_error += errors
return (test_files, total_tests, total_passed, total_failed, total_error)
def spawn_netns():
# prefer unshare module
try:
import unshare
unshare.unshare(unshare.CLONE_NEWNET)
return True
except:
pass
# sledgehammer style:
# - call ourselves prefixed by 'unshare -n' if found
# - pass extra --no-netns parameter to avoid another recursion
try:
import shutil
unshare = shutil.which("unshare")
if unshare is None:
return False
sys.argv.append("--no-netns")
os.execv(unshare, [unshare, "-n", sys.executable] + sys.argv)
except:
pass
return False
def main():
global xtables_nft_multi
if args.replay:
if os.getuid() != 0:
print("Replay test requires root, sorry", file=sys.stderr)
return
if not args.no_netns and not spawn_netns():
print("Cannot run in own namespace, connectivity might break",
file=sys.stderr)
if not args.host:
os.putenv("XTABLES_LIBDIR", os.path.abspath("extensions"))
xtables_nft_multi = os.path.abspath(os.path.curdir) \
+ '/iptables/' + xtables_nft_multi
files = tests = passed = failed = errors = 0
for test in args.test:
if not test.endswith(".txlate"):
test += ".txlate"
try:
with open(test, "r") as payload:
t, p, f, e = run_test(test, payload)
files += 1
tests += t
passed += p
failed += f
errors += e
except IOError:
print(red("Error: ") + "test file does not exist", file=sys.stderr)
return 99
if files == 0:
files, tests, passed, failed, errors = load_test_files()
if files > 1:
file_word = "files"
else:
file_word = "file"
print("%d test %s, %d tests, %d tests passed, %d tests failed, %d errors"
% (files, file_word, tests, passed, failed, errors))
return passed - tests
parser = argparse.ArgumentParser()
parser.add_argument('-H', '--host', action='store_true',
help='Run tests against installed binaries')
parser.add_argument('-R', '--replay', action='store_true',
help='Replay tests to check iptables-nft parser')
parser.add_argument('-n', '--nft', type=str, default='nft',
help='Replay using given nft binary (default: \'%(default)s\')')
parser.add_argument('--no-netns', action='store_true',
help='Do not run testsuite in own network namespace')
parser.add_argument("test", nargs="*", help="run only the specified test file(s)")
args = parser.parse_args()
sys.exit(main())