blob: ba66e62d8d0fa445cd357f3f4e6a57ffc40c964e [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unittests for the compiler module."""
from __future__ import print_function
import os
import tempfile
import unittest
import arch
import bpf
import compiler
import parser # pylint: disable=wrong-import-order
ARCH_64 = arch.Arch.load_from_json(
os.path.join(
os.path.dirname(os.path.abspath(__file__)), 'testdata/arch_64.json'))
class CompileFilterStatementTests(unittest.TestCase):
"""Tests for PolicyCompiler.compile_filter_statement."""
def setUp(self):
self.arch = ARCH_64
self.compiler = compiler.PolicyCompiler(self.arch)
def _compile(self, line):
with tempfile.NamedTemporaryFile(mode='w') as policy_file:
policy_file.write(line)
policy_file.flush()
policy_parser = parser.PolicyParser(
self.arch, kill_action=bpf.KillProcess())
parsed_policy = policy_parser.parse_file(policy_file.name)
assert len(parsed_policy.filter_statements) == 1
return self.compiler.compile_filter_statement(
parsed_policy.filter_statements[0],
kill_action=bpf.KillProcess())
def test_allow(self):
"""Accept lines where the syscall is accepted unconditionally."""
block = self._compile('read: allow')
self.assertEqual(block.filter, None)
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
0)[1], 'ALLOW')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
1)[1], 'ALLOW')
def test_arg0_eq_generated_code(self):
"""Accept lines with an argument filter with ==."""
block = self._compile('read: arg0 == 0x100')
# It might be a bit brittle to check the generated code in each test
# case instead of just the behavior, but there should be at least one
# test where this happens.
self.assertEqual(
block.filter.instructions,
[
bpf.SockFilter(bpf.BPF_LD | bpf.BPF_W | bpf.BPF_ABS, 0, 0,
bpf.arg_offset(0, True)),
# Jump to KILL_PROCESS if the high word does not match.
bpf.SockFilter(bpf.BPF_JMP | bpf.BPF_JEQ | bpf.BPF_K, 0, 2, 0),
bpf.SockFilter(bpf.BPF_LD | bpf.BPF_W | bpf.BPF_ABS, 0, 0,
bpf.arg_offset(0, False)),
# Jump to KILL_PROCESS if the low word does not match.
bpf.SockFilter(bpf.BPF_JMP | bpf.BPF_JEQ | bpf.BPF_K, 1, 0,
0x100),
bpf.SockFilter(bpf.BPF_RET, 0, 0,
bpf.SECCOMP_RET_KILL_PROCESS),
bpf.SockFilter(bpf.BPF_RET, 0, 0, bpf.SECCOMP_RET_ALLOW),
])
def test_arg0_comparison_operators(self):
"""Accept lines with an argument filter with comparison operators."""
biases = (-1, 0, 1)
# For each operator, store the expectations of simulating the program
# against the constant plus each entry from the |biases| array.
cases = (
('==', ('KILL_PROCESS', 'ALLOW', 'KILL_PROCESS')),
('!=', ('ALLOW', 'KILL_PROCESS', 'ALLOW')),
('<', ('ALLOW', 'KILL_PROCESS', 'KILL_PROCESS')),
('<=', ('ALLOW', 'ALLOW', 'KILL_PROCESS')),
('>', ('KILL_PROCESS', 'KILL_PROCESS', 'ALLOW')),
('>=', ('KILL_PROCESS', 'ALLOW', 'ALLOW')),
)
for operator, expectations in cases:
block = self._compile('read: arg0 %s 0x100' % operator)
# Check the filter's behavior.
for bias, expectation in zip(biases, expectations):
self.assertEqual(
block.simulate(self.arch.arch_nr,
self.arch.syscalls['read'],
0x100 + bias)[1], expectation)
def test_arg0_mask_operator(self):
"""Accept lines with an argument filter with &."""
block = self._compile('read: arg0 & 0x3')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
0)[1], 'KILL_PROCESS')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
1)[1], 'ALLOW')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
2)[1], 'ALLOW')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
3)[1], 'ALLOW')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
4)[1], 'KILL_PROCESS')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
5)[1], 'ALLOW')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
6)[1], 'ALLOW')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
7)[1], 'ALLOW')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
8)[1], 'KILL_PROCESS')
def test_arg0_in_operator(self):
"""Accept lines with an argument filter with in."""
block = self._compile('read: arg0 in 0x3')
# The 'in' operator only ensures that no bits outside the mask are set,
# which means that 0 is always allowed.
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
0)[1], 'ALLOW')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
1)[1], 'ALLOW')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
2)[1], 'ALLOW')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
3)[1], 'ALLOW')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
4)[1], 'KILL_PROCESS')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
5)[1], 'KILL_PROCESS')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
6)[1], 'KILL_PROCESS')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
7)[1], 'KILL_PROCESS')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
8)[1], 'KILL_PROCESS')
def test_arg0_short_gt_ge_comparisons(self):
"""Ensure that the short comparison optimization kicks in."""
if self.arch.bits == 32:
return
short_constant_str = '0xdeadbeef'
short_constant = int(short_constant_str, base=0)
long_constant_str = '0xbadc0ffee0ddf00d'
long_constant = int(long_constant_str, base=0)
biases = (-1, 0, 1)
# For each operator, store the expectations of simulating the program
# against the constant plus each entry from the |biases| array.
cases = (
('<', ('ALLOW', 'KILL_PROCESS', 'KILL_PROCESS')),
('<=', ('ALLOW', 'ALLOW', 'KILL_PROCESS')),
('>', ('KILL_PROCESS', 'KILL_PROCESS', 'ALLOW')),
('>=', ('KILL_PROCESS', 'ALLOW', 'ALLOW')),
)
for operator, expectations in cases:
short_block = self._compile(
'read: arg0 %s %s' % (operator, short_constant_str))
long_block = self._compile(
'read: arg0 %s %s' % (operator, long_constant_str))
# Check that the emitted code is shorter when the high word of the
# constant is zero.
self.assertLess(
len(short_block.filter.instructions),
len(long_block.filter.instructions))
# Check the filter's behavior.
for bias, expectation in zip(biases, expectations):
self.assertEqual(
long_block.simulate(self.arch.arch_nr,
self.arch.syscalls['read'],
long_constant + bias)[1], expectation)
self.assertEqual(
short_block.simulate(
self.arch.arch_nr, self.arch.syscalls['read'],
short_constant + bias)[1], expectation)
def test_and_or(self):
"""Accept lines with a complex expression in DNF."""
block = self._compile('read: arg0 == 0 && arg1 == 0 || arg0 == 1')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 0,
0)[1], 'ALLOW')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 0,
1)[1], 'KILL_PROCESS')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 1,
0)[1], 'ALLOW')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 1,
1)[1], 'ALLOW')
def test_ret_errno(self):
"""Accept lines that return errno."""
block = self._compile('read : arg0 == 0 || arg0 == 1 ; return 1')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
0)[1:], ('ERRNO', 1))
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
1)[1:], ('ERRNO', 1))
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
2)[1], 'KILL_PROCESS')
def test_ret_errno_unconditionally(self):
"""Accept lines that return errno unconditionally."""
block = self._compile('read: return 1')
self.assertEqual(
block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
0)[1:], ('ERRNO', 1))
def test_mmap_write_xor_exec(self):
"""Accept the idiomatic filter for mmap."""
block = self._compile(
'read : arg0 in ~PROT_WRITE || arg0 in ~PROT_EXEC')
prot_exec_and_write = 6
for prot in range(0, 0xf):
if (prot & prot_exec_and_write) == prot_exec_and_write:
self.assertEqual(
block.simulate(self.arch.arch_nr,
self.arch.syscalls['read'], prot)[1],
'KILL_PROCESS')
else:
self.assertEqual(
block.simulate(self.arch.arch_nr,
self.arch.syscalls['read'], prot)[1],
'ALLOW')
if __name__ == '__main__':
unittest.main()