blob: e6089e24d691d53993cc325fef134ba42cbc3082 [file] [log] [blame]
#
# Copyright (c) 2011 Thomas Graf <tgraf@suug.ch>
#
"""Module providing access to network links
This module provides an interface to view configured network links,
modify them and to add and delete virtual network links.
The following is a basic example:
import netlink.core as netlink
import netlink.route.link as link
sock = netlink.Socket()
sock.connect(netlink.NETLINK_ROUTE)
cache = link.LinkCache() # create new empty link cache
cache.refill(sock) # fill cache with all configured links
eth0 = cache['eth0'] # lookup link "eth0"
print eth0 # print basic configuration
The module contains the following public classes:
- Link -- Represents a network link. Instances can be created directly
via the constructor (empty link objects) or via the refill()
method of a LinkCache.
- LinkCache -- Derived from netlink.Cache, holds any number of
network links (Link instances). Main purpose is to keep
a local list of all network links configured in the
kernel.
The following public functions exist:
- get_from_kernel(socket, name)
"""
from __future__ import absolute_import
__version__ = "0.1"
__all__ = [
"LinkCache",
"Link",
]
import socket
from .. import core as netlink
from .. import capi as core_capi
from . import capi as capi
from .links import inet as inet
from .. import util as util
# Link statistics definitions
RX_PACKETS = 0
TX_PACKETS = 1
RX_BYTES = 2
TX_BYTES = 3
RX_ERRORS = 4
TX_ERRORS = 5
RX_DROPPED = 6
TX_DROPPED = 7
RX_COMPRESSED = 8
TX_COMPRESSED = 9
RX_FIFO_ERR = 10
TX_FIFO_ERR = 11
RX_LEN_ERR = 12
RX_OVER_ERR = 13
RX_CRC_ERR = 14
RX_FRAME_ERR = 15
RX_MISSED_ERR = 16
TX_ABORT_ERR = 17
TX_CARRIER_ERR = 18
TX_HBEAT_ERR = 19
TX_WIN_ERR = 20
COLLISIONS = 21
MULTICAST = 22
IP6_INPKTS = 23
IP6_INHDRERRORS = 24
IP6_INTOOBIGERRORS = 25
IP6_INNOROUTES = 26
IP6_INADDRERRORS = 27
IP6_INUNKNOWNPROTOS = 28
IP6_INTRUNCATEDPKTS = 29
IP6_INDISCARDS = 30
IP6_INDELIVERS = 31
IP6_OUTFORWDATAGRAMS = 32
IP6_OUTPKTS = 33
IP6_OUTDISCARDS = 34
IP6_OUTNOROUTES = 35
IP6_REASMTIMEOUT = 36
IP6_REASMREQDS = 37
IP6_REASMOKS = 38
IP6_REASMFAILS = 39
IP6_FRAGOKS = 40
IP6_FRAGFAILS = 41
IP6_FRAGCREATES = 42
IP6_INMCASTPKTS = 43
IP6_OUTMCASTPKTS = 44
IP6_INBCASTPKTS = 45
IP6_OUTBCASTPKTS = 46
IP6_INOCTETS = 47
IP6_OUTOCTETS = 48
IP6_INMCASTOCTETS = 49
IP6_OUTMCASTOCTETS = 50
IP6_INBCASTOCTETS = 51
IP6_OUTBCASTOCTETS = 52
ICMP6_INMSGS = 53
ICMP6_INERRORS = 54
ICMP6_OUTMSGS = 55
ICMP6_OUTERRORS = 56
class LinkCache(netlink.Cache):
"""Cache of network links"""
def __init__(self, family=socket.AF_UNSPEC, cache=None):
if not cache:
cache = self._alloc_cache_name("route/link")
self._info_module = None
self._protocol = netlink.NETLINK_ROUTE
self._nl_cache = cache
self._set_arg1(family)
def __getitem__(self, key):
if type(key) is int:
link = capi.rtnl_link_get(self._nl_cache, key)
else:
link = capi.rtnl_link_get_by_name(self._nl_cache, key)
if link is None:
raise KeyError()
else:
return Link.from_capi(link)
@staticmethod
def _new_object(obj):
return Link(obj)
def _new_cache(self, cache):
return LinkCache(family=self.arg1, cache=cache)
class Link(netlink.Object):
"""Network link"""
def __init__(self, obj=None):
netlink.Object.__init__(self, "route/link", "link", obj)
self._rtnl_link = self._obj2type(self._nl_object)
if self.type:
self._module_lookup("netlink.route.links." + self.type)
self.inet = inet.InetLink(self)
self.af = {"inet": self.inet}
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
if exc_type is None:
self.change()
else:
return False
@classmethod
def from_capi(cls, obj):
return cls(capi.link2obj(obj))
@staticmethod
def _obj2type(obj):
return capi.obj2link(obj)
def __cmp__(self, other):
return self.ifindex - other.ifindex
@staticmethod
def _new_instance(obj):
if not obj:
raise ValueError()
return Link(obj)
@property
@netlink.nlattr(type=int, immutable=True, fmt=util.num)
def ifindex(self):
"""interface index"""
return capi.rtnl_link_get_ifindex(self._rtnl_link)
@ifindex.setter
def ifindex(self, value):
capi.rtnl_link_set_ifindex(self._rtnl_link, int(value))
# ifindex is immutable but we assume that if _orig does not
# have an ifindex specified, it was meant to be given here
if capi.rtnl_link_get_ifindex(self._orig) == 0:
capi.rtnl_link_set_ifindex(self._orig, int(value))
@property
@netlink.nlattr(type=str, fmt=util.bold)
def name(self):
"""Name of link"""
return capi.rtnl_link_get_name(self._rtnl_link)
@name.setter
def name(self, value):
capi.rtnl_link_set_name(self._rtnl_link, value)
# name is the secondary identifier, if _orig does not have
# the name specified yet, assume it was meant to be specified
# here. ifindex will always take priority, therefore if ifindex
# is specified as well, this will be ignored automatically.
if capi.rtnl_link_get_name(self._orig) is None:
capi.rtnl_link_set_name(self._orig, value)
@property
@netlink.nlattr(type=str, fmt=util.string)
def flags(self):
"""Flags
Setting this property will *Not* reset flags to value you supply in
Examples:
link.flags = '+xxx' # add xxx flag
link.flags = 'xxx' # exactly the same
link.flags = '-xxx' # remove xxx flag
link.flags = [ '+xxx', '-yyy' ] # list operation
"""
flags = capi.rtnl_link_get_flags(self._rtnl_link)
return capi.rtnl_link_flags2str(flags, 256)[0].split(",")
def _set_flag(self, flag):
if flag.startswith("-"):
i = capi.rtnl_link_str2flags(flag[1:])
capi.rtnl_link_unset_flags(self._rtnl_link, i)
elif flag.startswith("+"):
i = capi.rtnl_link_str2flags(flag[1:])
capi.rtnl_link_set_flags(self._rtnl_link, i)
else:
i = capi.rtnl_link_str2flags(flag)
capi.rtnl_link_set_flags(self._rtnl_link, i)
@flags.setter
def flags(self, value):
if not (type(value) is str):
for flag in value:
self._set_flag(flag)
else:
self._set_flag(value)
@property
@netlink.nlattr(type=int, fmt=util.num)
def mtu(self):
"""Maximum Transmission Unit"""
return capi.rtnl_link_get_mtu(self._rtnl_link)
@mtu.setter
def mtu(self, value):
capi.rtnl_link_set_mtu(self._rtnl_link, int(value))
@property
@netlink.nlattr(type=int, immutable=True, fmt=util.num)
def family(self):
"""Address family"""
return capi.rtnl_link_get_family(self._rtnl_link)
@family.setter
def family(self, value):
capi.rtnl_link_set_family(self._rtnl_link, value)
@property
@netlink.nlattr(type=str, fmt=util.addr)
def address(self):
"""Hardware address (MAC address)"""
a = capi.rtnl_link_get_addr(self._rtnl_link)
return netlink.AbstractAddress(a)
@address.setter
def address(self, value):
capi.rtnl_link_set_addr(self._rtnl_link, value._addr)
@property
@netlink.nlattr(type=str, fmt=util.addr)
def broadcast(self):
"""Hardware broadcast address"""
a = capi.rtnl_link_get_broadcast(self._rtnl_link)
return netlink.AbstractAddress(a)
@broadcast.setter
def broadcast(self, value):
capi.rtnl_link_set_broadcast(self._rtnl_link, value._addr)
@property
@netlink.nlattr(type=str, immutable=True, fmt=util.string)
def qdisc(self):
"""Name of qdisc (cannot be changed)"""
return capi.rtnl_link_get_qdisc(self._rtnl_link)
@qdisc.setter
def qdisc(self, value):
capi.rtnl_link_set_qdisc(self._rtnl_link, value)
@property
@netlink.nlattr(type=int, fmt=util.num)
def txqlen(self):
"""Length of transmit queue"""
return capi.rtnl_link_get_txqlen(self._rtnl_link)
@txqlen.setter
def txqlen(self, value):
capi.rtnl_link_set_txqlen(self._rtnl_link, int(value))
@property
@netlink.nlattr(type=str, immutable=True, fmt=util.string)
def arptype(self):
"""Type of link (cannot be changed)"""
type_ = capi.rtnl_link_get_arptype(self._rtnl_link)
return core_capi.nl_llproto2str(type_, 64)[0]
@arptype.setter
def arptype(self, value):
i = core_capi.nl_str2llproto(value)
capi.rtnl_link_set_arptype(self._rtnl_link, i)
@property
@netlink.nlattr(type=str, immutable=True, fmt=util.string, title="state")
def operstate(self):
"""Operational status"""
operstate = capi.rtnl_link_get_operstate(self._rtnl_link)
return capi.rtnl_link_operstate2str(operstate, 32)[0]
@operstate.setter
def operstate(self, value):
i = capi.rtnl_link_str2operstate(value)
capi.rtnl_link_set_operstate(self._rtnl_link, i)
@property
@netlink.nlattr(type=str, immutable=True, fmt=util.string)
def mode(self):
"""Link mode"""
mode = capi.rtnl_link_get_linkmode(self._rtnl_link)
return capi.rtnl_link_mode2str(mode, 32)[0]
@mode.setter
def mode(self, value):
i = capi.rtnl_link_str2mode(value)
capi.rtnl_link_set_linkmode(self._rtnl_link, i)
@property
@netlink.nlattr(type=str, fmt=util.string)
def alias(self):
"""Interface alias (SNMP)"""
return capi.rtnl_link_get_ifalias(self._rtnl_link)
@alias.setter
def alias(self, value):
capi.rtnl_link_set_ifalias(self._rtnl_link, value)
@property
@netlink.nlattr(type=str, fmt=util.string)
def type(self):
"""Link type"""
return capi.rtnl_link_get_type(self._rtnl_link)
@type.setter
def type(self, value):
if capi.rtnl_link_set_type(self._rtnl_link, value) < 0:
raise NameError("unknown info type")
self._module_lookup("netlink.route.links." + value)
def get_stat(self, stat):
"""Retrieve statistical information"""
if type(stat) is str:
stat = capi.rtnl_link_str2stat(stat)
if stat < 0:
raise NameError("unknown name of statistic")
return capi.rtnl_link_get_stat(self._rtnl_link, stat)
def enslave(self, slave, sock=None):
if not sock:
sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
return capi.rtnl_link_enslave(sock._sock, self._rtnl_link, slave._rtnl_link)
def release(self, slave, sock=None):
if not sock:
sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
return capi.rtnl_link_release(sock._sock, self._rtnl_link, slave._rtnl_link)
def add(self, sock=None, flags=None):
if not sock:
sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
if not flags:
flags = netlink.NLM_F_CREATE
ret = capi.rtnl_link_add(sock._sock, self._rtnl_link, flags)
if ret < 0:
raise netlink.KernelError(ret)
def change(self, sock=None, flags=0):
"""Commit changes made to the link object"""
if sock is None:
sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
if not self._orig:
raise netlink.NetlinkError("Original link not available")
ret = capi.rtnl_link_change(sock._sock, self._orig, self._rtnl_link, flags)
if ret < 0:
raise netlink.KernelError(ret)
def delete(self, sock=None):
"""Attempt to delete this link in the kernel"""
if sock is None:
sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
ret = capi.rtnl_link_delete(sock._sock, self._rtnl_link)
if ret < 0:
raise netlink.KernelError(ret)
###################################################################
# private properties
#
# Used for formatting output. USE AT OWN RISK
@property
def _state(self):
if "up" in self.flags:
buf = util.good("up")
if "lowerup" not in self.flags:
buf += " " + util.bad("no-carrier")
else:
buf = util.bad("down")
return buf
@property
def _brief(self):
return self._module_brief() + self._foreach_af("brief")
@property
def _flags(self):
ignore = [
"up",
"running",
"lowerup",
]
return ",".join([flag for flag in self.flags if flag not in ignore])
def _foreach_af(self, name, args=None):
buf = ""
for af in self.af:
try:
func = getattr(self.af[af], name)
s = str(func(args))
if len(s) > 0:
buf += " " + s
except AttributeError:
pass
return buf
def format(self, details=False, stats=False, indent=""):
"""Return link as formatted text"""
fmt = util.MyFormatter(self, indent)
buf = fmt.format(
"{a|ifindex} {a|name} {a|arptype} {a|address} "
"{a|_state} <{a|_flags}> {a|_brief}"
)
if details:
buf += fmt.nl("\t{t|mtu} {t|txqlen} {t|weight} " "{t|qdisc} {t|operstate}")
buf += fmt.nl("\t{t|broadcast} {t|alias}")
buf += self._foreach_af("details", fmt)
if stats:
lst = [
["Packets", RX_PACKETS, TX_PACKETS],
["Bytes", RX_BYTES, TX_BYTES],
["Errors", RX_ERRORS, TX_ERRORS],
["Dropped", RX_DROPPED, TX_DROPPED],
["Compressed", RX_COMPRESSED, TX_COMPRESSED],
["FIFO Errors", RX_FIFO_ERR, TX_FIFO_ERR],
["Length Errors", RX_LEN_ERR, None],
["Over Errors", RX_OVER_ERR, None],
["CRC Errors", RX_CRC_ERR, None],
["Frame Errors", RX_FRAME_ERR, None],
["Missed Errors", RX_MISSED_ERR, None],
["Abort Errors", None, TX_ABORT_ERR],
["Carrier Errors", None, TX_CARRIER_ERR],
["Heartbeat Errors", None, TX_HBEAT_ERR],
["Window Errors", None, TX_WIN_ERR],
["Collisions", None, COLLISIONS],
["Multicast", None, MULTICAST],
["", None, None],
["Ipv6:", None, None],
["Packets", IP6_INPKTS, IP6_OUTPKTS],
["Bytes", IP6_INOCTETS, IP6_OUTOCTETS],
["Discards", IP6_INDISCARDS, IP6_OUTDISCARDS],
["Multicast Packets", IP6_INMCASTPKTS, IP6_OUTMCASTPKTS],
["Multicast Bytes", IP6_INMCASTOCTETS, IP6_OUTMCASTOCTETS],
["Broadcast Packets", IP6_INBCASTPKTS, IP6_OUTBCASTPKTS],
["Broadcast Bytes", IP6_INBCASTOCTETS, IP6_OUTBCASTOCTETS],
["Delivers", IP6_INDELIVERS, None],
["Forwarded", None, IP6_OUTFORWDATAGRAMS],
["No Routes", IP6_INNOROUTES, IP6_OUTNOROUTES],
["Header Errors", IP6_INHDRERRORS, None],
["Too Big Errors", IP6_INTOOBIGERRORS, None],
["Address Errors", IP6_INADDRERRORS, None],
["Unknown Protocol", IP6_INUNKNOWNPROTOS, None],
["Truncated Packets", IP6_INTRUNCATEDPKTS, None],
["Reasm Timeouts", IP6_REASMTIMEOUT, None],
["Reasm Requests", IP6_REASMREQDS, None],
["Reasm Failures", IP6_REASMFAILS, None],
["Reasm OK", IP6_REASMOKS, None],
["Frag Created", None, IP6_FRAGCREATES],
["Frag Failures", None, IP6_FRAGFAILS],
["Frag OK", None, IP6_FRAGOKS],
["", None, None],
["ICMPv6:", None, None],
["Messages", ICMP6_INMSGS, ICMP6_OUTMSGS],
["Errors", ICMP6_INERRORS, ICMP6_OUTERRORS],
]
buf += "\n\t%s%s%s%s\n" % (
33 * " ",
util.title("RX"),
15 * " ",
util.title("TX"),
)
for row in lst:
row[0] = util.kw(row[0])
row[1] = self.get_stat(row[1]) if row[1] else ""
row[2] = self.get_stat(row[2]) if row[2] else ""
buf += "\t{0[0]:27} {0[1]:>16} {0[2]:>16}\n".format(row)
buf += self._foreach_af("stats")
return buf
def get(name, sock=None):
"""Lookup Link object directly from kernel"""
if not name:
raise ValueError()
if not sock:
sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
link = capi.get_from_kernel(sock._sock, 0, name)
if not link:
return None
return Link.from_capi(link)
_link_cache = LinkCache()
def resolve(name):
_link_cache.refill()
return _link_cache[name]