Source code for sandpaper.sandpaper

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2017 Stephen Bunn (stephen@bunn.io)
# MIT License <https://opensource.org/licenses/MIT>

import os
import hashlib
import warnings
import datetime
import functools
import collections

import six
import regex
import pyexcel


[docs]def value_rule(func): """ A meta wrapper for value normalization rules. .. note:: Value rules take in a full record and a column name as implicit parameters. They are expected to return the value at ``record[column]`` that has be normalized by the rule. :param callable func: The normalization rule :returns: The wrapped normalization rule :rtype: callable """ @functools.wraps(func) def wrapper(self, *args, **kwargs): self.value_rules.add(func) self.rules.append((func, args, kwargs,)) return self return wrapper
[docs]def record_rule(func): """ A meta wrapper for table normalization rules. .. note:: Record rules are applied after all value rules have been applied to a record. They take in a full record as an implicit parameter and are expected to return the normalized record back. :param callable func: The normalization rule :returns: The wrapped normalization rule :rtype: callable """ @functools.wraps(func) def wrapper(self, *args, **kwargs): self.record_rules.add(func) self.rules.append((func, args, kwargs,)) return self return wrapper
[docs]class SandPaper(object): """ The SandPaper object. Allows chained data normalization across multiple different table type data files such as ``.csv``, ``.xls``, and ``.xlsx``. """ __available_filters = ( 'column_filter', 'value_filter', 'callable_filter', ) __rule_stats = {} __default_apply = { 'auto_detect_datetime': False, } def __init__(self, name=None): """ Initializes the SandPaper object. .. note:: If a descriptive name is not provided, the name references a continually updating uid hash of the active rules. :param str name: The descriptive name of the SandPaper object """ if name is not None: self.name = name def __repr__(self): """ Returns a string representation of a SandPaper instance. :returns: A string representation of a SandPaper instance :rtype: str """ return ( '<{self.__class__.__name__} ({self.uid}) "{self.name}">' ).format(self=self) def __eq__(self, other): """ Evaluates if two instances are the same. .. note:: Name is not taken into consideration for instance equality. :returns: A boolean if two instances are the same :rtype: bool """ return hash(self) == hash(other) def __hash__(self): """ Returns an identifying integer for the calling SandPaper instance. :returns: An identifying integer for the calling SandPaper instance :rtype: int """ return int(self.uid, 16) def __json__(self): """ The current instance to a dictionary suitable for json encoding. .. note:: Raises a UserWarning when a callable is discovered as a critical part of a rule :returns: A dictionary suitable for json encoding :rtype: dict """ return { 'name': self.name, 'uid': self.uid, 'rules': [( rule.__name__, self.__jsonify(rule_args, warn=True), self.__jsonify(rule_kwargs, warn=True) ) for (rule, rule_args, rule_kwargs,) in self.rules] } def __jsonify(self, value, warn=False): """ Custom jsonification simplification of random values. .. note:: Raises a UserWarning when a callable is discovered as a critical part of a rule :param .... value: A value to jsonify :param bool warn: A flag to indicate if warnings need to be raised :returns: The jsonification of the passed value """ if isinstance(value, dict): rebuild = {} for (k, v,) in value.items(): jsonified = self.__jsonify(v, warn=warn) if jsonified is not None: rebuild[k] = v return rebuild elif isinstance(value, (list, set, tuple,)): rebuild = [] for i in value: jsonified = self.__jsonify(i, warn=warn) if jsonified is not None: rebuild.append(jsonified) return rebuild elif callable(value): if warn: warnings.warn(( "callable '{value.__name__}' in instance {self} detected, " "built instances from generated json will not contain the " "corresponding action" ).format(**locals()), UserWarning) return None return value def __row_filter(self, record, normalized=False): """ Default row filter callable. :param collections.OrderedDict record: An ordered dictionary of (``column_name``, ``row_value``) items :param bool normalized: A flag which indicates if the row filter call is being applied *pre* or *post* data normalization. (True indicates post-normalization) :returns: True :rtype: bool """ return True @property def name(self): """ The descriptive name of the SandPaper instance. .. note:: If no name has been given, a continually updating uid hash of the active rules is used instead :getter: Returns the given or suitable name for a SandPaper instance :setter: Sets the descriptive name of the SandPaper instance :rtype: str """ if not hasattr(self, '_name'): return self.uid return self._name @name.setter def name(self, name): """ Sets the descriptive name of the SandPaper instance. :param str name: A descriptive name for the SandPaper instance :returns: Nothing """ assert isinstance(name, six.string_types) and len(name) > 0, ( 'name expected a string of positive length, received "{name}"' ).format(**locals()) self._name = name @property def uid(self): """ A continually updating hash of the active rules. A hexadecimal digest string :getter: Returns a continually updating hash of the active rules :rtype: str """ hasher = hashlib.sha1() for (rule, rule_args, rule_kwargs,) in self.rules: hasher.update(( "{rule.__name__}({args}, {kwargs})" ).format( rule=rule, args=self.__jsonify(rule_args), kwargs=self.__jsonify(rule_kwargs)).encode('utf-8') ) return hasher.hexdigest() @property def rules(self): """ This list of applicable rules for the SandPaper instance. :getter: Returns the list of applicable rules for the instance :rtype: list[tuple(callable, tuple(....,....), dict[str,....])] """ if not hasattr(self, '_rules'): self._rules = [] return self._rules @property def value_rules(self): """ The set of value rules for the SandPaper instance. :getter: Returns the set rules for the SandPaper instance :rtype: set(callable) """ if not hasattr(self, '_value_rules'): self._value_rules = set() return self._value_rules @property def record_rules(self): """ The set of record rules for the SandPaper instance. :getter: Returns the set rules for the SandPaper instance :rtype: set(callable) """ if not hasattr(self, '_record_rules'): self._record_rules = set() return self._record_rules def _filter_values( self, record, column_filter=None, value_filter=None, callable_filter=None, **kwargs ): """ Yield only allowed (column, value) pairs using supported filters. :param collections.OrderedDict record: An ordered dictionary of (``column_name``, ``row_value``) items :param str column_filter: A matched regular expression for ``column_name`` :param str value_filter: A matched regular expression for ``row_value`` :param callable callable_filter: An truthy evaluated callable :param dict kwargs: Any named arguments, for the kwargs of ``callable_filter`` :returns: A generator yielding allowed (column, value) pairs """ for (column, value) in record.items(): if column_filter is not None: if not column_filter.match(str(column)): continue if value_filter is not None: if not value_filter.match(str(value)): continue if callable(callable_filter): if not callable_filter(record, column, **kwargs): continue yield (column, value,) def _apply_rules( self, from_file, sheet_name=None, row_filter=None, monitor_rules=False, **kwargs ): """ Base rule application method. :param str from_file: The file to apply rules to :param str sheet_name: The name of the sheet to apply rules to :param callable row_filter: A callable which accepts a cleaned record and returns True if the record should be written out :param dict kwargs: Any named arguments, for the reading of the file :returns: Yields normalized records """ # TODO: find a better way of allow the user to keep specific rows # instead of using a callable which cannot be serialized to json if not callable(row_filter): row_filter = self.__row_filter for record in pyexcel.iget_records( file_name=from_file, sheet_name=sheet_name, **kwargs ): if row_filter(record, normalized=False): # start application of all registered rules for (rule, rule_args, rule_kwargs,) in self.rules: if monitor_rules and rule.__name__ not in \ self.__rule_stats: self.__rule_stats[rule.__name__] = 0 if rule in self.value_rules: # value rules are required to pass filtering for (column, value,) in self._filter_values( record, **rule_kwargs ): # handle application of value rule record[column] = rule( self, record.copy(), column, *rule_args, **rule_kwargs ) if monitor_rules: self.__rule_stats[rule.__name__] += 1 else: # handle application of record rule record = rule( self, record.copy(), *rule_args, **rule_kwargs ) if monitor_rules: self.__rule_stats[rule.__name__] += 1 # row filtering done post record normalization if row_filter(record, normalized=True): yield record def _apply_to( self, from_file, to_file, sheet_name=None, row_filter=None, monitor_rules=False, **kwargs ): """ Threadable rule processing method. .. important:: No overwrite protection is enabled for this method. If the ``from_file`` is equal to the ``to_file``, then ``from_file`` will be overwritten. :param str from_file: The input filepath :param str to_file: The output filepath :param str sheet_name: The name of the sheet to apply rules to :param callable row_filter: A callable which accepts a cleaned record and returns True if the record should be written out :param bool monitor_rules: Boolean flag that inidicates if the count of applied rules should be monitored :param dict kwargs: Any named arguments, passed to ``_apply_rules`` :returns: The rule statistics if ``monitor_rules`` is true :rtype: dict[str, int] """ try: pyexcel.isave_as( records=self._apply_rules( from_file, sheet_name=sheet_name, row_filter=row_filter, monitor_rules=monitor_rules, **kwargs ), dest_file_name=to_file, dest_lineterminator=os.linesep, ) if monitor_rules: return self.__rule_stats finally: self.__rule_stats = {}
[docs] @value_rule def lower(self, record, column, **kwargs): """ A basic lowercase rule for a given value. Only applies to text type variables :param collections.OrderedDict record: A record whose value within ``column`` should be normalized and returned :param str column: A column that indicates what value to normalize :param dict kwargs: Any named arguments :returns: The value lowercased """ value = record[column] return ( value.lower() if isinstance(value, six.string_types) else value )
[docs] @value_rule def upper(self, record, column, **kwargs): """ A basic uppercase rule for a given value. Only applies to text type variables :param collections.OrderedDict record: A record whose value within ``column`` should be normalized and returned :param str column: A column that indicates what value to normalize :param dict kwargs: Any named arguments :returns: The value uppercased """ value = record[column] return ( value.upper() if isinstance(value, six.string_types) else value )
[docs] @value_rule def capitalize(self, record, column, **kwargs): """ A basic capitalization rule for a given value. Only applies to text type variables :param collections.OrderedDict record: A record whose value within ``column`` should be normalized and returned :param str column: A column that indicates what value to normalize :param dict kwargs: Any named arguments :returns: The value capatilized """ value = record[column] return ( value.capitalize() if isinstance(value, six.string_types) else value )
[docs] @value_rule def title(self, record, column, **kwargs): """ A basic titlecase rule for a given value. Only applies to text type variables :param collections.OrderedDict record: A record whose value within ``column`` should be normalized and returned :param str column: A column that indicates what value to normalize :param dict kwargs: Any named arguments :returns: The value titlecased """ value = record[column] return ( value.title() if isinstance(value, six.string_types) else value )
[docs] @value_rule def lstrip(self, record, column, content=None, **kwargs): """ A basic lstrip rule for a given value. Only applies to text type variables. :param collections.OrderedDict record: A record whose value within ``column`` should be normalized and returned :param str column: A column that indicates what value to normalize :param str content: The content to strip (defaults to whitespace) :param dict kwargs: Any named arguments :returns: The value with left content stripped """ value = record[column] return ( value.lstrip(content) if isinstance(value, six.string_types) else value )
[docs] @value_rule def rstrip(self, record, column, content=None, **kwargs): """ A basic rstrip rule for a given value. Only applies to text type variables. :param collections.OrderedDict record: A record whose value within ``column`` should be normalized and returned :param str column: A column that indicates what value to normalize :param str content: The content to strip (defaults to whitespace) :param dict kwargs: Any named arguments :returns: The value with right content stripped """ value = record[column] return ( value.rstrip(content) if isinstance(value, six.string_types) else value )
[docs] @value_rule def strip(self, record, column, content=None, **kwargs): """ A basic strip rule for a given value. Only applies to text type variables. :param collections.OrderedDict record: A record whose value within ``column`` should be normalized and returned :param str column: A column that indicates what value to normalize :param str content: The content to strip (defaults to whitespace) :param dict kwargs: Any named arguments :returns: The value with all content stripped """ value = record[column] return ( value.strip(content) if isinstance(value, six.string_types) else value )
[docs] @value_rule def increment( self, record, column, amount=1, **kwargs ): """ A basic increment rule for a given value. Only applies to numeric (int, float) type variables. :param collections.OrderedDict record: A record whose value within ``column`` should be normalized and returned :param str column: A column that indicates what value to normalize :param amount: The amount to increment by :type amount: int or float :param dict kwargs: Any named arguments :returns: The value incremented by ``amount`` """ value = record[column] if isinstance(value, (int, float,)): return (value + amount) return value
[docs] @value_rule def decrement( self, record, column, amount=1, **kwargs ): """ A basic decrement rule for a given value. Only applies to numeric (int, float) type variables. :param collections.OrderedDict record: A record whose value within ``column`` should be normalized and returned :param str column: A column that indicates what value to normalize :param amount: The amount to decrement by :type amount: int or float :param dict kwargs: Any named arguments :returns: The value incremented by ``amount`` """ value = record[column] if isinstance(value, (int, float,)): return (value - amount) return value
[docs] @value_rule def replace( self, record, column, replacements, **kwargs ): """ Applies a replacements dictionary to a value. Take for example the following SandPaper instance: .. code-block:: python s = SandPaper('my-sandpaper').replace({ 'this_is_going_to_be_replaced': 'with_this', }) :param collections.OrderedDict record: A record whose value within ``column`` should be normalized and returned :param str column: A column that indicates what value to normalize :param replacements: A dictionary of replacements for the value :type replacements: dict[str, str] :param dict kwargs: Any named arguments :returns: The value with all replacements made """ value = record[column] if isinstance(value, six.string_types): for (from_text, to_text,) in replacements.items(): value = value.replace(from_text, to_text) return value
[docs] @value_rule def translate_text( self, record, column, translations, **kwargs ): """ A text translation rule for a given value. Take for example the following SandPaper instance: .. code-block:: python s = SandPaper('my-sandpaper').translate_text({ r'^group(?P<group_id>\d+)\s*(.*)$': '{group_id}' }, column_filter=r'^group_definition$') This will translate all instances of the value ``group<GROUP NUMBER>`` to ``<GROUP NUMBER>`` only in columns named ``group_definition``. .. important:: Note that matched groups and matched groupdicts are passed as ``*args`` and ``**kwargs`` to the format method of the returned ``to_format`` string. :param collections.OrderedDict record: A record whose value within ``column`` should be normalized and returned :param str column: A column that indicates what value to normalize :param translations: A dictionary of translations the value :type translations: dict[str, str] :param dict kwargs: Any named arguments :returns: The potentially translated value """ value = record[column] for (from_regex, to_format,) in translations.items(): match = regex.match(from_regex, str(value)) if match is not None: # NOTE: Would prefer to use PEP448, but have to do this for PY2 named_groups = kwargs.copy() named_groups.update(match.groupdict()) value = to_format.format( *[ (capture if capture is not None else '') for capture in match.groups() ], **{ name: (capture if capture is not None else '') for (name, capture) in named_groups.items() } ) return value
[docs] @value_rule def translate_date( self, record, column, translations, **kwargs ): """ A date translation rule for a given value. Take for example the following SandPaper instance: .. code-block:: python s = SandPaper('my-sandpaper').translate_date({ '%Y-%m-%d': '%Y', '%Y': '%Y', '%Y-%m': '%Y' }, column_filter=r'^(.*)_date$') This will translate all instances of a date value matching the given date formats in columns ending with ``_date`` to the date format ``%Y``. :param collections.OrderedDict record: A record whose value within ``column`` should be normalized and returned :param str column: A column that indicates what value to normalize :param translations: A dictionary of translations from an arrow based dateformats to a different format :type translations: dict[str, str] :param dict kwargs: Any named arguments :returns: The value potentially translated value """ value = record[column] if isinstance(value, (datetime.date, datetime.datetime,)): # FIXME: This isn't my fault but it needs to be fixed # pyexcel shouldn't detect this datetime with the __default_apply # parameters implicitly passed, but it does... return value.strftime(list(translations.values())[0]) for (from_format, to_format,) in translations.items(): try: return datetime.datetime.strptime( value, from_format ).strftime(to_format) except ValueError: continue return value
[docs] @record_rule def add_columns(self, record, additions, **kwargs): """ Adds columns to a record. .. note:: If the value of an entry in ``additions`` is a callable, then the callable should expect the ``record`` as the only parameter and should return the value that should be placed in the newly added column. If the value of an entry in ``additions`` is a string, the record is passed in as kwargs to the value's ``format`` method. Otherwise, the value of an entry in ``additions`` is simply used as the newly added column's value. :param collections.OrderedDict record: A record whose value within ``column`` should be normalized and returned :param additions: A dictionary of column names to callables, strings, or other values :type additions: dict[str,....] :param dict kwargs: Any named arguments :returns: The record with a potential newly added column """ for (name, value,) in additions.items(): if name in record: continue if callable(value): record[name] = value(record) elif isinstance(value, six.string_types): record[name] = value.format(**record) else: record[name] = value return record
[docs] @record_rule def remove_columns(self, record, removes, **kwargs): """ Removes columns from a record. :param collections.OrderedDict record: A record whose value within ``column`` should be normalized and returned :param removes: A list of columns to remove :type removes: list[str] :param dict kwargs: Any named arguments :returns: The record with a potential newly removed column """ for name in removes: if name in record: del record[name] return record
[docs] @record_rule def keep_columns(self, record, keeps, **kwargs): """ Removes all other columns from a record. :param collections.OrderedDict record: A record whose value within ``column`` should be normalized and returned :param keeps: A list of columns to keep :type keeps: list[str] :param dict kwargs: Any named arguments :returns: The record with a potential newly kept column """ try: new_record = record.copy() for column_name in record: if column_name not in keeps: del new_record[column_name] return new_record finally: # memory removal of unused record (shouldn't cause issues) del record
[docs] @record_rule def rename_columns( self, record, renames, **kwargs ): """ Maps an existing column to a new column. :param collections.OrderedDict record: A record whose value within ``column`` should be normalized and returned :param renames: A dictionary of column to column renames :type renames: dict[str, str] :param dict kwargs: Any named arguments :returns: The record with the remapped column """ # full OrderedDict rebuild required for column renaming return collections.OrderedDict([( (renames[key] if key in renames else key), value, ) for (key, value,) in record.items()])
[docs] @record_rule def order_columns( self, record, order, ignore_missing=False, **kwargs ): """ Orders columns in a specific order. :param collections.OrderedDict record: A record who should be ordered :param order: The order that columns need to be in :type order: list[str] :param bool ignore_missing: Boolean which inidicates if missing columns from ``order`` should be ignored :param dict kwargs: Any named arguments :returns: The record with the columns reordered """ ordered_record = collections.OrderedDict([ (column_name, record[column_name],) for column_name in order if column_name in record ]) if not ignore_missing: for column_name in record: if column_name not in order: ordered_record[column_name] = record[column_name] return ordered_record
[docs] def apply( self, from_file, to_file, sheet_name=None, row_filter=None, monitor_rules=False, **kwargs ): """ Applies a SandPaper instance rules to a given glob of files. :param str from_file: The path of the file to apply the rules to :param str to_file: The path of the file to write to :param str sheet_name: The name of the sheet to apply rules to (defaults to the first available sheet) :param callable row_filter: A callable which accepts a cleaned record and returns True if the record should be written out :param bool monitor_rules: Boolean flag that inidicates if the count of applied rules should be monitored :param dict kwargs: Any additional named arguments (applied to the pyexcel ``iget_records`` method) :returns: The rule statistics if ``monitor_rules`` is true :rtype: dict[str, int] """ # precompile filter regexes (kinda speeds up the processing) for (rule, rule_args, rule_kwargs,) in self.rules: for (key, value,) in rule_kwargs.items(): if key in self.__available_filters and \ isinstance(value, six.string_types): rule_kwargs[key] = regex.compile(value) try: return self._apply_to( from_file, to_file, sheet_name=sheet_name, row_filter=row_filter, monitor_rules=monitor_rules, **dict(self.__default_apply, **kwargs) ) finally: pyexcel.free_resources()
[docs] @classmethod def from_json(cls, serialization): """ Loads a SandPaper instance from a json serialization. .. note:: Raises a ``UserWarning`` when the loaded instance does not match the serialized instance's ``uid``. :param dict serialization: The read json serialization :returns: A new SandPaper instance :rtype: SandPaper """ paper = ( cls(serialization['name']) if serialization['name'] != serialization['uid'] else cls() ) for ( rule_name, rule_args, rule_kwargs, ) in serialization['rules']: getattr(paper, rule_name)(*rule_args, **rule_kwargs) if paper.uid != serialization['uid']: warnings.warn(( "loaded instance {paper} does not match serialization uid " "'{serialization[uid]}', serialized instance most likely " "cannot be fully serialized" ).format(**locals()), UserWarning) return paper