blob: d9533c7f56ef013c009d5a9246afa38003d77a41 [file] [log] [blame]
# This file is part of Scapy
# Scapy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# any later version.
#
# Scapy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Scapy. If not, see <http://www.gnu.org/licenses/>.
# Copyright (C) 2016 Gauthier Sebaux
# scapy.contrib.description = ProfinetIO Real-Time Cyclic (RTC)
# scapy.contrib.status = loads
"""
PROFINET IO layers for scapy which correspond to Real-Time Cyclic data
"""
# external imports
from __future__ import absolute_import
import math
import struct
# Scapy imports
from scapy.all import Packet, bind_layers, Ether, UDP, Field, conf
from scapy.fields import BitEnumField, BitField, ByteField,\
FlagsField,\
PacketListField,\
ShortField, StrFixedLenField,\
XBitField, XByteField
# local imports
from scapy.contrib.pnio import ProfinetIO
from scapy.compat import orb
from scapy.modules.six.moves import range
#####################################
## PROFINET Real-Time Data Packets ##
#####################################
class PNIORealTimeIOxS(Packet):
"""IOCS and IOPS packets for PROFINET Real-Time payload"""
name = "PNIO RTC IOxS"
fields_desc = [
BitEnumField("dataState", 1, 1, ["bad", "good"]),
BitEnumField("instance", 0, 2, ["subslot", "slot", "device", "controller"]),
XBitField("reserved", 0, 4),
BitField("extension", 0, 1),
]
def extract_padding(self, s):
return None, s # No extra payload
class PNIORealTimeRawData(Packet):
"""Raw data packets for PROFINET Real-Time payload.
It's a configurable packet whose config only includes a fix length. The
config parameter must then be a dict {"length": X}.
PROFINET IO specification impose this packet to be followed with an IOPS
(PNIORealTimeIOxS)"""
__slots__ = ["_config"]
name = "PNIO RTC Raw data"
fields_desc = [
StrFixedLenField("load", "", length_from=lambda p: p[PNIORealTimeRawData].length()),
]
def __init__(self, _pkt="", post_transform=None, _internal=0, _underlayer=None, config=None, **fields):
"""
length=None means that the length must be managed by the user. If it's
defined, the field will always be length-long (padded with b"\\x00" if
needed)
"""
self._config = config
Packet.__init__(self, _pkt=_pkt, post_transform=post_transform,
_internal=_internal, _underlayer=_underlayer, **fields)
def copy(self):
pkt = Packet.copy(self)
pkt._config = self._config
return pkt
def clone_with(self, *args, **kargs):
pkt = Packet.clone_with(self, *args, **kargs)
pkt._config = self._config
return pkt
def length(self):
"""Get the length of the raw data"""
# Manage the length of the packet if a length is provided
return self._config["length"]
# Make sure an IOPS follows a data
bind_layers(PNIORealTimeRawData, PNIORealTimeIOxS)
###############################
## PROFINET Real-Time Fields ##
###############################
class LowerLayerBoundPacketListField(PacketListField):
"""PacketList which binds each underlayer of packets to the current pkt"""
def m2i(self, pkt, m):
return self.cls(m, _underlayer=pkt)
class NotionalLenField(Field):
"""A len fields which isn't present in the machine representation, but is
computed from a given lambda"""
__slots__ = ["length_from", "count_from"]
def __init__(self, name, default, length_from=None, count_from=None):
Field.__init__(self, name, default)
self.length_from = length_from
self.count_from = count_from
def addfield(self, pkt, s, val):
return s # Isn't present in the machine packet
def getfield(self, pkt, s):
val = None
if self.length_from is not None:
val = self.length_from(pkt, s)
elif self.count_from is not None:
val = self.count_from(pkt, s)
return s, val
###############################
## PNIORealTime Configuration #
###############################
# conf.contribs["PNIO_RTC"] is a dict which contains data layout for each Ethernet
# communications. It must be formatted as such:
# {(Ether.src, Ether.dst): [(start, type, config), ...]}
# start: index of a data field from the END of the data buffer (-1, -2, ...)
# type: class to be instanciated to represent these data
# config: a config dict, given to the type class constructor
conf.contribs["PNIO_RTC"] = {}
def _get_ethernet(pkt):
"""Find the Ethernet packet of underlayer or None"""
ether = pkt
while ether is not None and not isinstance(ether, Ether):
ether = ether.underlayer
return ether
def pnio_update_config(config):
"""Update the PNIO RTC config"""
conf.contribs["PNIO_RTC"].update(config)
def pnio_get_config(pkt):
"""Retrieve the config for a given communication"""
# get the config based on the tuple (Ether.src, Ether.dst)
ether = _get_ethernet(pkt)
config = None
if ether is not None and (ether.src, ether.dst) in conf.contribs["PNIO_RTC"]:
config = conf.contribs["PNIO_RTC"][(ether.src, ether.dst)]
return config
###############################
## PROFINET Real-Time Packet ##
###############################
def _pnio_rtc_guess_payload_class(_pkt, _underlayer=None, *args, **kargs):
"""A dispatcher for the packet list field which manage the configuration
to fin dthe appropriate class"""
config = pnio_get_config(_underlayer)
if isinstance(config, list):
# If we have a valid config, it's a list which describe each data
# packets the rest being IOCS
cur_index = -len(_pkt)
for index, cls, params in config:
if cur_index == index:
return cls(_pkt, config=params, *args, **kargs)
# Not a data => IOCS packet
return PNIORealTimeIOxS(_pkt, *args, **kargs)
else:
# No config => Raw data which dissect the whole _pkt
return PNIORealTimeRawData(_pkt,
config={"length": len(_pkt)},
*args, **kargs
)
_PNIO_DS_FLAGS = [
"primary",
"redundancy",
"validData",
"reserved_1",
"run",
"no_problem",
"reserved_2",
"ignore",
]
class PNIORealTime(Packet):
"""PROFINET cyclic real-time"""
name = "PROFINET Real-Time"
fields_desc = [
NotionalLenField("len", None, length_from=lambda p, s: len(s)),
NotionalLenField("dataLen", None, length_from=lambda p, s: len(s[:-4].rstrip(b"\0"))),
LowerLayerBoundPacketListField("data", [], _pnio_rtc_guess_payload_class, length_from=lambda p: p.dataLen),
StrFixedLenField("padding", "", length_from=lambda p: p[PNIORealTime].padding_length()),
ShortField("cycleCounter", 0),
FlagsField("dataStatus", 0x35, 8, _PNIO_DS_FLAGS),
ByteField("transferStatus", 0)
]
overload_fields = {
ProfinetIO: {"frameID": 0x8000}, # RT_CLASS_1
}
def padding_length(self):
"""Compute the length of the padding need for the ethernet frame"""
fld, val = self.getfield_and_val("data")
# use the len field if available to define the padding length, eg for
# dissected packets
pkt_len = self.getfieldval("len")
if pkt_len is not None:
return max(0, pkt_len - len(fld.addfield(self, b"", val)) - 4)
if isinstance(self.underlayer, ProfinetIO) and \
isinstance(self.underlayer.underlayer, UDP):
return max(0, 12 - len(fld.addfield(self, b"", val)))
else:
return max(0, 40 - len(fld.addfield(self, b"", val)))
@staticmethod
def analyse_data(packets):
"""Analyse the data to find heuristical properties and determine
location and type of data"""
loc = PNIORealTime.find_data(packets)
loc = PNIORealTime.analyse_profisafe(packets, loc)
pnio_update_config(loc)
return loc
@staticmethod
def find_data(packets):
"""Analyse a packet list to extract data offsets from packets data."""
# a dictionary to count data offsets (ie != 0x80)
# It's formatted: {(src, dst): (total, [count for offset in len])}
heuristic = {}
# Counts possible data locations
# 0x80 are mainly IOxS and trailling 0x00s are just padding
for pkt in packets:
if PNIORealTime in pkt:
pdu = bytes(pkt[PNIORealTime])[:-4].rstrip(b"\0")
if (pkt.src, pkt.dst) not in heuristic:
heuristic[(pkt.src, pkt.dst)] = (0, [])
total, counts = heuristic[(pkt.src, pkt.dst)]
if len(counts) < len(pdu):
counts.extend([0 for _ in range(len(pdu) - len(counts))])
for i in range(len(pdu)):
if orb(pdu[i]) != 0x80:
counts[i] += 1
comm = (pkt.src, pkt.dst)
heuristic[comm] = (total + 1, counts)
# Determine data locations
locations = {}
for comm in heuristic:
total, counts = heuristic[comm]
length = len(counts)
loc = locations[comm] = []
start = None
for i in range(length):
if counts[i] > total // 2: # Data if more than half is != 0x80
if start is None:
start = i
else:
if start is not None:
loc.append((
start - length,
PNIORealTimeRawData,
{"length": i - start}
))
start = None
return locations
@staticmethod
def analyse_profisafe(packets, locations=None):
"""Analyse a packet list to find possible PROFISafe profils.
It's based on an heuristical analysis of each payload to try to find
CRC and control/status byte.
locations: possible data locations. If not provided, analyse_pn_rt will
be called beforehand. If not given, it calls in the same time
analyse_data which update the configuration of the data field"""
# get data locations and entropy of bytes
if not locations:
locations = PNIORealTime.find_data(packets)
entropies = PNIORealTime.data_entropy(packets, locations)
# Try to find at least 3 high entropy successive bytes (the CRC)
for comm in entropies:
entropy = dict(entropies[comm]) # Convert tuples to key => value
for i in range(len(locations[comm])):
# update each location with its value after profisafe analysis
locations[comm][i] = \
PNIORealTime.analyse_one_profisafe_location(
locations[comm][i], entropy
)
return locations
@staticmethod
def analyse_one_profisafe_location(location, entropy):
"""Analyse one PNIO RTC data location to find if its a PROFISafe
:param location: location to analyse, a tuple (start, type, config)
:param entropy: the entropy of each byte of the packet data
:returns: the configuration associated with the data
"""
start, klass, conf = location
if conf["length"] >= 4: # Minimal PROFISafe length
succ_count = 0
for j in range(start, start + conf["length"]):
# Limit for a CRC is set to 6 bit of entropy min
if j in entropy and entropy[j] >= 6:
succ_count += 1
else:
succ_count = 0
# PROFISafe profiles must end with at least 3 bytes of high entropy
if succ_count >= 3: # Possible profisafe CRC
return (
start,
Profisafe,
{"CRC": succ_count, "length": conf["length"]}
)
# Not a PROFISafe profile
return (start, klass, conf)
@staticmethod
def data_entropy(packets, locations=None):
"""Analyse a packet list to find the entropy of each data byte
locations: possible data locations. If not provided, analyse_pn_rt will
be called beforehand. If not given, it calls in the same time
analyse_data which update the configuration of the data field"""
if not locations:
locations = PNIORealTime.find_data(packets)
# Retrieve the entropy of each data byte, for each communication
entropies = {}
for comm in locations:
if len(locations[comm]) > 0: # Doesn't append empty data
entropies[comm] = []
comm_packets = []
# fetch all packets from the communication
for pkt in packets:
if PNIORealTime in pkt and (pkt.src, pkt.dst) == comm:
comm_packets.append(
bytes(pkt[PNIORealTime])[:-4].rstrip(b"\0")
)
# Get the entropy
for start, dummy, conf in locations[comm]:
for i in range(start, start + conf["length"]):
entropies[comm].append(
(i, entropy_of_byte(comm_packets, i))
)
return entropies
@staticmethod
def draw_entropy(packets, locations=None):
"""Plot the entropy of each data byte of PN RT communication"""
import matplotlib.pyplot as plt
import matplotlib.cm as cm
entropies = PNIORealTime.data_entropy(packets, locations)
rows = len(entropies)
cur_row = 1
for comm in entropies:
index = []
vals = []
for i, ent in entropies[comm]:
index.append(i)
vals.append(ent)
# Offsets the indexes to get the index from the beginning
offset = -min(index)
index = [i + offset for i in index]
plt.subplot(rows, 1, cur_row)
plt.bar(index, vals, 0.8, color="r")
plt.xticks([i + 0.4 for i in index], index)
plt.title("Entropy from %s to %s" % comm)
cur_row += 1
plt.ylabel("Shannon Entropy")
plt.xlabel("Byte offset") # x label only on the last row
plt.legend()
plt.tight_layout()
plt.show()
def entropy_of_byte(packets, position):
"""Compute the entropy of a byte at a given offset"""
counter = [0 for _ in range(256)]
# Count each byte a appearance
for pkt in packets:
if -position <= len(pkt): # position must be a negative index
counter[orb(pkt[position])] += 1
# Compute the Shannon entropy
entropy = 0
length = len(packets)
for count in counter:
if count > 0:
ratio = float(count) / length
entropy -= ratio * math.log(ratio, 2)
return entropy
###############
## PROFISafe ##
###############
class XVarBytesField(XByteField):
"""Variable length bytes field, from 0 to 8 bytes"""
__slots__ = ["length_from"]
def __init__(self, name, default, length=None, length_from=None):
self.length_from = length_from
if length:
self.length_from = lambda p, l=length: l
Field.__init__(self, name, default, "!Q")
def addfield(self, pkt, s, val):
length = self.length_from(pkt)
return s + struct.pack(self.fmt, self.i2m(pkt, val))[8-length:]
def getfield(self, pkt, s):
length = self.length_from(pkt)
val = struct.unpack(self.fmt, b"\x00"*(8 - length) + s[:length])[0]
return s[length:], self.m2i(pkt, val)
class Profisafe(PNIORealTimeRawData):
"""PROFISafe profil to be encapsulated inside the PNRT.data list.
It's a configurable packet whose config includes a fix length, and a CRC
length. The config parameter must then be a dict {"length": X, "CRC": Y}.
"""
name = "PROFISafe"
fields_desc = [
StrFixedLenField("load", "", length_from=lambda p: p[Profisafe].data_length()),
XByteField("Control_Status", 0),
XVarBytesField("CRC", 0, length_from=lambda p: p[Profisafe].crc_length())
]
def data_length(self):
"""Return the length of the data"""
ret = self.length() - self.crc_length() - 1
return ret
def crc_length(self):
"""Return the length of the crc"""
return self._config["CRC"]