| import time |
| import datetime |
| |
| from django import forms |
| from django.forms.util import ErrorDict |
| from django.conf import settings |
| from django.contrib.contenttypes.models import ContentType |
| from models import Comment |
| from django.utils.crypto import salted_hmac, constant_time_compare |
| from django.utils.encoding import force_unicode |
| from django.utils.hashcompat import sha_constructor |
| from django.utils.text import get_text_list |
| from django.utils.translation import ungettext, ugettext_lazy as _ |
| |
| COMMENT_MAX_LENGTH = getattr(settings,'COMMENT_MAX_LENGTH', 3000) |
| |
| class CommentSecurityForm(forms.Form): |
| """ |
| Handles the security aspects (anti-spoofing) for comment forms. |
| """ |
| content_type = forms.CharField(widget=forms.HiddenInput) |
| object_pk = forms.CharField(widget=forms.HiddenInput) |
| timestamp = forms.IntegerField(widget=forms.HiddenInput) |
| security_hash = forms.CharField(min_length=40, max_length=40, widget=forms.HiddenInput) |
| |
| def __init__(self, target_object, data=None, initial=None): |
| self.target_object = target_object |
| if initial is None: |
| initial = {} |
| initial.update(self.generate_security_data()) |
| super(CommentSecurityForm, self).__init__(data=data, initial=initial) |
| |
| def security_errors(self): |
| """Return just those errors associated with security""" |
| errors = ErrorDict() |
| for f in ["honeypot", "timestamp", "security_hash"]: |
| if f in self.errors: |
| errors[f] = self.errors[f] |
| return errors |
| |
| def clean_security_hash(self): |
| """Check the security hash.""" |
| security_hash_dict = { |
| 'content_type' : self.data.get("content_type", ""), |
| 'object_pk' : self.data.get("object_pk", ""), |
| 'timestamp' : self.data.get("timestamp", ""), |
| } |
| expected_hash = self.generate_security_hash(**security_hash_dict) |
| actual_hash = self.cleaned_data["security_hash"] |
| if not constant_time_compare(expected_hash, actual_hash): |
| # Fallback to Django 1.2 method for compatibility |
| # PendingDeprecationWarning <- here to remind us to remove this |
| # fallback in Django 1.5 |
| expected_hash_old = self._generate_security_hash_old(**security_hash_dict) |
| if not constant_time_compare(expected_hash_old, actual_hash): |
| raise forms.ValidationError("Security hash check failed.") |
| return actual_hash |
| |
| def clean_timestamp(self): |
| """Make sure the timestamp isn't too far (> 2 hours) in the past.""" |
| ts = self.cleaned_data["timestamp"] |
| if time.time() - ts > (2 * 60 * 60): |
| raise forms.ValidationError("Timestamp check failed") |
| return ts |
| |
| def generate_security_data(self): |
| """Generate a dict of security data for "initial" data.""" |
| timestamp = int(time.time()) |
| security_dict = { |
| 'content_type' : str(self.target_object._meta), |
| 'object_pk' : str(self.target_object._get_pk_val()), |
| 'timestamp' : str(timestamp), |
| 'security_hash' : self.initial_security_hash(timestamp), |
| } |
| return security_dict |
| |
| def initial_security_hash(self, timestamp): |
| """ |
| Generate the initial security hash from self.content_object |
| and a (unix) timestamp. |
| """ |
| |
| initial_security_dict = { |
| 'content_type' : str(self.target_object._meta), |
| 'object_pk' : str(self.target_object._get_pk_val()), |
| 'timestamp' : str(timestamp), |
| } |
| return self.generate_security_hash(**initial_security_dict) |
| |
| def generate_security_hash(self, content_type, object_pk, timestamp): |
| """ |
| Generate a HMAC security hash from the provided info. |
| """ |
| info = (content_type, object_pk, timestamp) |
| key_salt = "django.contrib.forms.CommentSecurityForm" |
| value = "-".join(info) |
| return salted_hmac(key_salt, value).hexdigest() |
| |
| def _generate_security_hash_old(self, content_type, object_pk, timestamp): |
| """Generate a (SHA1) security hash from the provided info.""" |
| # Django 1.2 compatibility |
| info = (content_type, object_pk, timestamp, settings.SECRET_KEY) |
| return sha_constructor("".join(info)).hexdigest() |
| |
| class CommentDetailsForm(CommentSecurityForm): |
| """ |
| Handles the specific details of the comment (name, comment, etc.). |
| """ |
| name = forms.CharField(label=_("Name"), max_length=50) |
| email = forms.EmailField(label=_("Email address")) |
| url = forms.URLField(label=_("URL"), required=False) |
| comment = forms.CharField(label=_('Comment'), widget=forms.Textarea, |
| max_length=COMMENT_MAX_LENGTH) |
| |
| def get_comment_object(self): |
| """ |
| Return a new (unsaved) comment object based on the information in this |
| form. Assumes that the form is already validated and will throw a |
| ValueError if not. |
| |
| Does not set any of the fields that would come from a Request object |
| (i.e. ``user`` or ``ip_address``). |
| """ |
| if not self.is_valid(): |
| raise ValueError("get_comment_object may only be called on valid forms") |
| |
| CommentModel = self.get_comment_model() |
| new = CommentModel(**self.get_comment_create_data()) |
| new = self.check_for_duplicate_comment(new) |
| |
| return new |
| |
| def get_comment_model(self): |
| """ |
| Get the comment model to create with this form. Subclasses in custom |
| comment apps should override this, get_comment_create_data, and perhaps |
| check_for_duplicate_comment to provide custom comment models. |
| """ |
| return Comment |
| |
| def get_comment_create_data(self): |
| """ |
| Returns the dict of data to be used to create a comment. Subclasses in |
| custom comment apps that override get_comment_model can override this |
| method to add extra fields onto a custom comment model. |
| """ |
| return dict( |
| content_type = ContentType.objects.get_for_model(self.target_object), |
| object_pk = force_unicode(self.target_object._get_pk_val()), |
| user_name = self.cleaned_data["name"], |
| user_email = self.cleaned_data["email"], |
| user_url = self.cleaned_data["url"], |
| comment = self.cleaned_data["comment"], |
| submit_date = datetime.datetime.now(), |
| site_id = settings.SITE_ID, |
| is_public = True, |
| is_removed = False, |
| ) |
| |
| def check_for_duplicate_comment(self, new): |
| """ |
| Check that a submitted comment isn't a duplicate. This might be caused |
| by someone posting a comment twice. If it is a dup, silently return the *previous* comment. |
| """ |
| possible_duplicates = self.get_comment_model()._default_manager.using( |
| self.target_object._state.db |
| ).filter( |
| content_type = new.content_type, |
| object_pk = new.object_pk, |
| user_name = new.user_name, |
| user_email = new.user_email, |
| user_url = new.user_url, |
| ) |
| for old in possible_duplicates: |
| if old.submit_date.date() == new.submit_date.date() and old.comment == new.comment: |
| return old |
| |
| return new |
| |
| def clean_comment(self): |
| """ |
| If COMMENTS_ALLOW_PROFANITIES is False, check that the comment doesn't |
| contain anything in PROFANITIES_LIST. |
| """ |
| comment = self.cleaned_data["comment"] |
| if settings.COMMENTS_ALLOW_PROFANITIES == False: |
| bad_words = [w for w in settings.PROFANITIES_LIST if w in comment.lower()] |
| if bad_words: |
| plural = len(bad_words) > 1 |
| raise forms.ValidationError(ungettext( |
| "Watch your mouth! The word %s is not allowed here.", |
| "Watch your mouth! The words %s are not allowed here.", plural) % \ |
| get_text_list(['"%s%s%s"' % (i[0], '-'*(len(i)-2), i[-1]) for i in bad_words], 'and')) |
| return comment |
| |
| class CommentForm(CommentDetailsForm): |
| honeypot = forms.CharField(required=False, |
| label=_('If you enter anything in this field '\ |
| 'your comment will be treated as spam')) |
| |
| def clean_honeypot(self): |
| """Check that nothing's been entered into the honeypot.""" |
| value = self.cleaned_data["honeypot"] |
| if value: |
| raise forms.ValidationError(self.fields["honeypot"].label) |
| return value |