""" Helpers module. This module can be safely imported from any fobi (sub)module, since it never imports from any of the fobi (sub)modules (except for the `fobi.constants` and `fobi.exceptions` modules). """ __title__ = 'fobi.helpers' __author__ = 'Artur Barseghyan ' __copyright__ = 'Copyright (c) 2014 Artur Barseghyan' __license__ = 'GPL 2.0/LGPL 2.1' __all__ = ( 'do_slugify', 'lists_overlap', 'iterable_to_dict', 'map_field_name_to_label', 'clean_dict', 'two_dicts_to_string', 'empty_string', 'ensure_unique_filename', 'handle_uploaded_file', 'delete_file', 'clone_file', 'get_registered_models', 'admin_change_url', 'uniquify_sequence', 'safe_text', 'combine_dicts', 'update_plugin_data', 'get_select_field_choices', 'validate_initial_for_choices', 'validate_initial_for_multiple_choices', 'validate_submit_value_as', 'get_app_label_and_model_name', 'StrippedUser', 'StrippedRequest', 'JSONDataExporter', ) import os import glob import logging import uuid import shutil from six import text_type, PY3 import simplejson as json from django.conf import settings from django.core.urlresolvers import reverse from django.core.files.base import File from django.contrib.contenttypes.models import ContentType from django.db.utils import DatabaseError from django.utils.encoding import force_text from django import forms from django.utils.translation import ugettext_lazy as _ from django.contrib.auth.models import AnonymousUser from django.test.client import RequestFactory from django.http import HttpResponse from autoslug.settings import slugify from nine.user import User from nine.versions import DJANGO_GTE_1_7 from fobi.constants import ( SUBMIT_VALUE_AS_VAL, SUBMIT_VALUE_AS_REPR, SUBMIT_VALUE_AS_MIX ) from fobi.exceptions import ImproperlyConfigured logger = logging.getLogger(__name__) # ***************************************************************************** # ***************************************************************************** # ********************************** General ********************************** # ***************************************************************************** # ***************************************************************************** do_slugify = lambda s: slugify(s.lower()).lower() def safe_text(text): """ Safe text (encode). :return str: """ if PY3: return force_text(text, encoding='utf-8') else: return force_text(text, encoding='utf-8').encode('utf-8') def lists_overlap(sub, main): for i in sub: if i in main: return True return False def iterable_to_dict(items, key_attr_name): """ Converts iterable of certain objects to dict. :param iterable items: :param string key_attr_name: Attribute to use as a dictionary key. :return dict: """ items_dict = {} for item in items: items_dict.update({getattr(item, key_attr_name): item}) return items_dict def map_field_name_to_label(form): """ Takes a form and creates label to field name map. :param django.forms.Form form: Instance of ``django.forms.Form``. :param list keys_to_remove: :return dict: """ return dict([(field_name, field.label) \ for (field_name, field) in form.base_fields.items()]) def clean_dict(source, keys=[], values=[]): """ Removes given keys and values from dictionary. :param dict source: :param iterable keys: :param iterable values: :return dict: """ d = {} for key, value in source.items(): if (not key in keys) and (not value in values): d[key] = value return d def combine_dicts(headers, data): """ Takes two dictionaries, assuming one contains a mapping keys to titles and another keys to data. Joins as string and returns a result dict. """ return [(value, data.get(key, '')) for key, value in list(headers.items())] def two_dicts_to_string(headers, data, html_element='p'): """ Takes two dictionaries, assuming one contains a mapping keys to titles and another keys to data. Joins as string and returns wrapped into HTML "p" tag. """ formatted_data = [ (value, data.get(key, '')) for key, value in list(headers.items()) ] return "".join( ["<{0}>{1}: {2}".format(html_element, safe_text(key), safe_text(value), html_element) for key, value in formatted_data] ) empty_string = text_type('') def uniquify_sequence(sequence): """ Makes sure items in the given sequence are unique, having the original order preserved. :param iterable sequence: :return list: """ seen = set() seen_add = seen.add return [x for x in sequence if x not in seen and not seen_add(x)] def get_ignorable_form_values(): """ Gets an iterable of form values to ignore. :return iterable: """ return [None, empty_string,] # ***************************************************************************** # ***************************************************************************** # ****************************** File helpers ********************************* # ***************************************************************************** # ***************************************************************************** def ensure_unique_filename(destination): """ Makes sure filenames are never overwritten. :param string destination: :return string: """ if os.path.exists(destination): filename, extension = os.path.splitext(destination) return "{0}_{1}{2}".format(filename, uuid.uuid4(), extension) else: return destination def handle_uploaded_file(upload_dir, image_file): """ :param django.core.files.uploadedfile.InMemoryUploadedFile image_file: :return string: Path to the image (relative). """ upload_dir_absolute_path = os.path.join(settings.MEDIA_ROOT, upload_dir) # Create path if doesn't exist yet if not os.path.exists(upload_dir_absolute_path): os.makedirs(upload_dir_absolute_path) if isinstance(image_file, File): destination_path = ensure_unique_filename( os.path.join(upload_dir_absolute_path, image_file.name) ) image_filename = image_file.name with open(destination_path, 'wb+') as destination: image_filename = os.path.basename(destination.name) for chunk in image_file.chunks(): destination.write(chunk) return os.path.join(upload_dir, image_filename) return image_file def delete_file(image_file): """ Delete file from disc. """ try: # Delete the main file. file_path = os.path.join(settings.MEDIA_ROOT, image_file) os.remove(file_path) # Delete the sized version of it. files = glob.glob("{0}*".format(file_path)) for f in files: try: os.remove(f) except Exception as e: logger.debug(str(e)) # If all goes well... return True except Exception as e: logger.debug(str(e)) return False def clone_file(upload_dir, source_filename, relative_path=True): """ Clones the file. :param string source_filename: Source filename. :return string: Filename of the cloned file. """ if source_filename.startswith(upload_dir): source_filename = os.path.join(settings.MEDIA_ROOT, source_filename) destination_filename = ensure_unique_filename(source_filename) try: shutil.copyfile(source_filename, destination_filename) if relative_path: destination_filename = destination_filename.replace( settings.MEDIA_ROOT, '' ) if destination_filename.startswith('/'): destination_filename = destination_filename[1:] return destination_filename except Exception as e: logger.debug(str(e)) def extract_file_path(name): """ Extracts the file path. :param string name: :return string: """ return os.path.join(settings.MEDIA_ROOT, name) # ***************************************************************************** # ***************************************************************************** # ****************************** Model helpers ******************************** # ***************************************************************************** # ***************************************************************************** def get_registered_models(ignore=[]): """ Gets registered models as list. :param iterable ignore: Ignore the following content types (should be in ``app_label.model`` format (example ``auth.User``). :return list: """ registered_models = [] try: content_types = ContentType._default_manager.all() for content_type in content_types: #model = content_type.model_class() content_type_id = "{0}.{1}".format( content_type.app_label, content_type.model ) if not content_type_id in ignore: registered_models.append((content_type_id, content_type.name)) except DatabaseError as e: logger.debug(str(e)) return registered_models def get_app_label_and_model_name(path): """ Gets app_label and model_name from the path given. :param str path: Dotted path to the model (without ".model", as stored in the Django `ContentType` model. :return tuple: app_label, model_name """ parts = path.split('.') return (''.join(parts[:-1]), parts[-1]) # ***************************************************************************** # ***************************************************************************** # ****************************** Admin helpers ******************************** # ***************************************************************************** # ***************************************************************************** def admin_change_url(app_label, module_name, object_id, extra_path='', url_title=None): """ Gets an admin change URL for the object given. :param str app_label: :param str module_name: :param int object_id: :param str extra_path: :param str url_title: If given, an HTML a tag is returned with `url_title` as the tag title. If left to None just the URL string is returned. :return str: """ try: url = reverse('admin:{0}_{1}_change'.format(app_label, module_name), args=[object_id]) + extra_path if url_title: return u'{1}'.format(url, url_title) else: return url except: return None # ***************************************************************************** # ***************************************************************************** # ****************************** Fobi data helpers **************************** # ***************************************************************************** # ***************************************************************************** def update_plugin_data(entry, request=None): """ Update plugin data of a given entry. """ if entry: plugin = entry.get_plugin(request=request) logger.debug(plugin) if plugin: return plugin._update_plugin_data(entry) def get_select_field_choices(raw_choices_data): """ Used in ``radio``, ``select`` and other choice based fields. :param str raw_choices_data: :return list: """ choices = [] # Holds return value keys = set([]) # For checking uniqueness of keys values = set([]) # For checking uniqueness of values # Looping through the raw data for choice in raw_choices_data.split('\n'): choice = choice.strip() # If comma separated key, value if ',' in choice: key, value = choice.split(',', 1) key = key.strip() value = value.strip() if key and not key in keys and not value in values: choices.append((key, value)) keys.add(key) values.add(value) # If key is also the value else: choice = choice.strip() if choice and not choice in keys and not choice in values: choices.append((choice, choice)) keys.add(choice) values.add(choice) return choices def validate_initial_for_choices(plugin_form, field_name_choices='choices', field_name_initial='initial'): """ Validates the initial value for the choices given. :param fobi.base.BaseFormFieldPluginForm plugin_form: """ available_choices = dict( get_select_field_choices(plugin_form.cleaned_data[field_name_choices]) ).keys() if plugin_form.cleaned_data[field_name_initial] \ and not plugin_form.cleaned_data[field_name_initial] \ in available_choices: raise forms.ValidationError( _("Invalid value for initial: {0}. Should be any of the following" ": {1}".format(plugin_form.cleaned_data[field_name_initial], ','.join(available_choices))) ) return plugin_form.cleaned_data[field_name_initial] def validate_initial_for_multiple_choices(plugin_form, field_name_choices='choices', field_name_initial='initial'): """ Validates the initial value for the multiple choices given. :param fobi.base.BaseFormFieldPluginForm plugin_form: """ available_choices = dict( get_select_field_choices(plugin_form.cleaned_data[field_name_choices]) ).keys() if plugin_form.cleaned_data[field_name_initial]: for choice in plugin_form.cleaned_data[field_name_initial].split(','): choice = choice.strip() if not choice in available_choices: raise forms.ValidationError( _("Invalid value for initial: {0}. Should be any " "of the following: {1}" "".format(choice, ','.join(available_choices))) ) return plugin_form.cleaned_data[field_name_initial] def validate_submit_value_as(value): """ Validates the `SUBMIT_AS_VALUE`. :param str value: """ if not value in (SUBMIT_VALUE_AS_VAL, SUBMIT_VALUE_AS_REPR, SUBMIT_VALUE_AS_MIX): raise ImproperlyConfigured("The `SUBMIT_AS_VALUE` may have one of " "the following values: {0}, {1} or {2}" "".format(SUBMIT_VALUE_AS_VAL, SUBMIT_VALUE_AS_REPR, SUBMIT_VALUE_AS_MIX)) class StrippedUser(object): """ Stripped user object. """ def __init__(self, user): """ :param user: :return: """ self._user = user if not self._user.is_anonymous(): setattr(self._user, User.USERNAME_FIELD, self._user.get_username()) else: setattr(self._user, User.USERNAME_FIELD, None) @property def email(self): return self._user.email def get_username(self): """ """ if not self._user.is_anonymous(): try: return self._user.get_username() except Exception as err: pass def get_full_name(self): """ """ if not self._user.is_anonymous(): try: return self._user.get_full_name() except Exception as err: pass def get_short_name(self): """ """ if not self._user.is_anonymous(): try: return self._user.get_full_name() except Exception as err: pass def is_anonymous(self): return self._user.is_anonymous() class StrippedRequest(object): """ Stripped request object. """ def __init__(self, request): """ :param django.http.HttpRequest request: :return: """ # Just to make sure nothing breaks if we don't provide the request # object, we do fall back to a fake request object. if request: self._request = request else: request_factory = RequestFactory() self._request = request_factory.get('/') if hasattr(request, 'user') and request.user: self.user = StrippedUser(self._request.user) else: self.user = StrippedUser(AnonymousUser()) @property def path(self): """ A string representing the full path to the requested page, not including the scheme or domain. """ return self._request.path def get_full_path(self): """ Returns the path, plus an appended query string, if applicable. """ return self._request.get_full_path() def is_secure(self): """ Returns True if the request is secure; that is, if it was made with HTTPS. """ return self._request.is_secure() def is_ajax(self): """ Returns True if the request was made via an XMLHttpRequest, by checking the HTTP_X_REQUESTED_WITH header for the string 'XMLHttpRequest' """ return self._request.is_ajax() @property def META(self): """ A standard Python dictionary containing all available HTTP headers. Available headers depend on the client and server, but here are some examples: - HTTP_ACCEPT_ENCODING: Acceptable encodings for the response. - HTTP_ACCEPT_LANGUAGE: Acceptable languages for the response. - HTTP_HOST: The HTTP Host header sent by the client. - HTTP_REFERER: The referring page, if any. - HTTP_USER_AGENT: The clients user-agent string. - QUERY_STRING: The query string, as a single (unparsed) string. - REMOTE_ADDR: The IP address of the client. """ META = { 'HTTP_ACCEPT_ENCODING': self._request.META \ .get('HTTP_ACCEPT_ENCODING'), 'HTTP_ACCEPT_LANGUAGE': self._request.META \ .get('HTTP_ACCEPT_LANGUAGE'), 'HTTP_HOST': self._request.META.get('HTTP_HOST'), 'HTTP_REFERER': self._request.META.get('HTTP_REFERER'), 'HTTP_USER_AGENT': self._request.META.get('HTTP_USER_AGENT'), 'QUERY_STRING': self._request.META.get('QUERY_STRING'), 'REMOTE_ADDR': self._request.META.get('REMOTE_ADDR'), } return META class JSONDataExporter(object): """ Exporting the data into JSON. """ def __init__(self, data, filename): """ :param str data: Dumped JSON data (`json.dumps()`). :param str filename: File name prefix. """ self.data = data self.filename = filename def _get_initial_response(self, mimetype="application/json"): """ For compatibility with older versions (`mimetype` vs `content_type`). """ response_kwargs = {} if DJANGO_GTE_1_7: response_kwargs['content_type'] = mimetype else: response_kwargs['mimetype'] = mimetype return HttpResponse(**response_kwargs) def export_to_json(self): """ Export data to JSON. """ response = self._get_initial_response(mimetype="text/json") response['Content-Disposition'] = \ 'attachment; filename={0}.json'.format(self.filename) response.write(self.data) return response def export(self): return self.export_to_json()