blob: 88b5969cbaa01d3e24a2b9a1a6bd51d43c1b7997 [file] [log] [blame]
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import json
import logging
import os
import threading
import time
logger = logging.getLogger(__name__)
class Denylist(object):
def __init__(self, path):
self._denylist_lock = threading.RLock()
self._path = path
def Read(self):
"""Reads the denylist from the denylist file.
Returns:
A dict containing bad devices.
"""
with self._denylist_lock:
denylist = dict()
if not os.path.exists(self._path):
return denylist
try:
with open(self._path, 'r') as f:
denylist = json.load(f)
except (IOError, ValueError) as e:
logger.warning('Unable to read denylist: %s', str(e))
os.remove(self._path)
if not isinstance(denylist, dict):
logger.warning('Ignoring %s: %s (a dict was expected instead)',
self._path, denylist)
denylist = dict()
return denylist
def Write(self, denylist):
"""Writes the provided denylist to the denylist file.
Args:
denylist: list of bad devices to write to the denylist file.
"""
with self._denylist_lock:
with open(self._path, 'w') as f:
json.dump(denylist, f)
def Extend(self, devices, reason='unknown'):
"""Adds devices to denylist file.
Args:
devices: list of bad devices to be added to the denylist file.
reason: string specifying the reason for denylist (eg: 'unauthorized')
"""
timestamp = time.time()
event_info = {
'timestamp': timestamp,
'reason': reason,
}
device_dicts = {device: event_info for device in devices}
logger.info('Adding %s to denylist %s for reason: %s', ','.join(devices),
self._path, reason)
with self._denylist_lock:
denylist = self.Read()
denylist.update(device_dicts)
self.Write(denylist)
def Reset(self):
"""Erases the denylist file if it exists."""
logger.info('Resetting denylist %s', self._path)
with self._denylist_lock:
if os.path.exists(self._path):
os.remove(self._path)