Source code for slacklog.parsers

"""
SlackLog parsers
================

SlackLog parser reads a Slackware ChangeLog.txt and builds an in-memory representation of it using SlackLog models.
"""
from __future__ import print_function

import re
import hashlib
from dateutil import parser
from dateutil import tz
from slacklog.models import SlackLog, SlackLogEntry, SlackLogPkg
from codecs import encode

try:
    str = unicode
except NameError:
    pass  # Forward compatibility with Py3k (unicode is not defined)

# pkg name starts from the beginning of line, and there's a colon followed by
# a double space.  But description can also contain "something:  ",
# so "something" should contain either a slash or a dot for it to look like
# a file name.
pkg_name_re = re.compile(r'\A[-a-zA-Z0-9_]+[/.][-a-zA-Z0-9_+/.]*[*]?:  ')

tzinfos = {
    'CDT': -5 * 60 * 60,
    'CST': -6 * 60 * 60,
    'UTC': 0,
    }


[docs]class SlackLogParser (object): """ Parser for recent (13.x) Slackware ChangeLogs. """ def __init__(self): self.quiet = False """If :py:const:`True`, warnings about date parsing are not printed.""" self.min_date = None """If set to a :py:class:`datetime.datetime` object, older log entries are ignored (not parsed).""" self.ENTRY = 0 """Counter of entries (for debugging).""" self.PKG = 0 """Counter of packages (for debugging)."""
[docs] def parse(self, data): """ Return the in-memory representation of the data. :param unicode data: the ChangeLog.txt content. :returns: in-memory representation of data :rtype: :py:class:`slacklog.models.SlackLog` """ assert(isinstance(data, str)) log = SlackLog() log.startsWithSeparator = re.match('\A(\+-+\+[\n]?)', data) log.endsWithSeparator = re.search('[\n](\+-+\+[\n]?)\Z', data) if log.startsWithSeparator: data = data[log.startsWithSeparator.start():] log.startsWithSeparator = True else: log.startsWithSeparator = False if log.endsWithSeparator: data = data[:log.endsWithSeparator.start(1)] log.endsWithSeparator = True else: log.endsWithSeparator = False for entry_data in self.split_log_to_entries(data): entry = self.parse_entry(entry_data, log) if entry: log.entries.insert(0, entry) return log
[docs] def split_log_to_entries(self, data): """ Split the ChangeLog.txt into a list of unparsed entries. :param unicode data: the ChangeLog.txt content. :returns: list of unparsed entries, separators removed. :rtype: [:py:class:`unicode`] """ assert(isinstance(data, str)) raw_entries = re.split('\+-+\+', data) entries = [] for entry in raw_entries: entry = entry.lstrip() if entry and entry != "": entries.append(entry) entries.reverse() return entries
[docs] def parse_entry(self, data, log): """ Parse a single ChangeLog entry. :param unicode data: ChangeLog entry content. :param log: in-memory representation that is being parsed. :type: :py:class:`slacklog.models.SlackLog` :return: in-memory representation of the ChangeLog entry. :rtype: :py:class:`slacklog.models.SlackLogEntry` """ assert(isinstance(data, str)) assert(isinstance(log, SlackLog)) self.ENTRY += 1 self.PKG = 0 checksum = self.gen_entry_checksum(data) parent = None if log.entries: parent = log.entries[0].identifier identifier = self.gen_entry_identifier(data, checksum, parent) else: identifier = self.gen_entry_identifier(data, checksum, None) timestamp, timezone, data = self.parse_entry_timestamp(data) if self.min_date and self.min_date > timestamp: return None description, data = self.parse_entry_description(data) entry = SlackLogEntry(timestamp, description, log, checksum=checksum, identifier=identifier, parent=parent, timezone=timezone) for pkg_data in self.split_entry_to_pkgs(data): pkg = self.parse_pkg(pkg_data, entry) entry.pkgs.append(pkg) return entry
[docs] def gen_entry_checksum(self, data): """ Generate ChangeLog entry checksum from data. :param unicode data: ChangeLog entry content. :return: Entry checksum. :rtype: :py:class:`unicode` """ assert(isinstance(data, str)) return u'%s' % hashlib.sha512(encode(data, 'utf-8')).hexdigest()
[docs] def gen_entry_identifier(self, data, checksum, parent): """ Generate ChangeLog entry identifier from data, checksum, and/or parent identifier. :param unicode data: ChangeLog entry content. :param unicode checksum: ChangeLog entry checksum. :param parent: Parent entry identifier or :py:const:`None` :return: Entry identifier. :rtype: :py:class:`unicode` """ if parent is not None: return u'%s' % hashlib.sha512(encode(parent + checksum, 'utf-8')).hexdigest() return u'%s' % hashlib.sha512(encode(checksum, 'utf-8')).hexdigest()
[docs] def parse_entry_timestamp(self, data): """ Parse ChangeLog entry timestamp from data. :param unicode data: ChangeLog entry content. :returns: a two element list: timestamp and the rest of the entry. :rtype: [:py:class:`datetime.datetime`, :py:class:`unicode`] """ assert(isinstance(data, str)) timestamp_str, data = self.get_line(data) timestamp, timezone = self.parse_date_with_timezone(timestamp_str) return [timestamp, timezone, data]
[docs] def parse_entry_description(self, data): """ Parse ChangeLog entry description from data. :param unicode data: ChangeLog entry content (without timestamp). :returns: a two element list: description and the rest of the entry. :rtype: [:py:class:`unicode`, :py:class:`unicode`] """ assert(isinstance(data, str)) description = u'' while data and not pkg_name_re.match(data): line, data = self.get_line(data) description += line return [description, data]
[docs] def split_entry_to_pkgs(self, data): """ Split ChangeLog entry content into a list of unparsed packages. :param unicode data: ChangeLog entry content (without timestamp or description). :return: a list of unparsed packages. :rtype: [:py:class:`unicode`] """ assert(isinstance(data, str)) pkgs = [] pkg_lines = [] if data == u'' or data == u'\n': return [] for line in data.split('\n'): if not pkg_name_re.match(line): pkg_lines.append(line) else: if pkg_lines: # pkg_lines is not the last package in # the entry: add an extra newline pkgs.append('\n'.join(pkg_lines) + '\n') pkg_lines = [] if line: pkg_lines.append(line) if pkg_lines: # last package in the entry: no extra newline pkgs.append('\n'.join(pkg_lines)) return pkgs
[docs] def parse_pkg(self, data, entry): """ Parse a single package. :param unicode data: Package name and description of the update. :param entry: in-memory representation of the ChangeLog entry being parsed. :type: :py:class:`slacklog.models.SlackLogEntry` :return: in-memory representation of the package. :rtype: :py:class:`slacklog.models.SlackLogPkg` """ assert(isinstance(data, str)) assert(isinstance(entry, SlackLogEntry)) self.PKG += 1 try: pkg, data = self.parse_pkg_name(data) except ValueError: print("data: '%s...'" % data[0:50]) raise description = self.parse_pkg_description(data) return SlackLogPkg(pkg, description, entry)
[docs] def parse_pkg_name(self, data): """ Parse package name from a package. :param unicode data: Package name and description. :return: a two element list: package name and package description. :rtype: [:py:class:`unicode`, :py:class:`unicode`] """ assert(isinstance(data, str)) return data.split(u':', 1)
[docs] def parse_pkg_description(self, data): """ Parse package description from a package. :param unicode data: Package description. :return: Package description. :rtype: :py:class:`unicode` """ assert(isinstance(data, str)) return data
[docs] def get_line(self, data): """ Consume one line from data. :param unicode data: Data. :return: a two element list: first line, rest of the data. :rtype: [:py:class:`unicode`, :py:class:`unicode`] """ assert(isinstance(data, str)) try: line, data = data.split(u'\n', 1) line += u'\n' except ValueError: # No newlines line = data data = u'' return [line, data]
[docs] def parse_date(self, data): """ Parse a time string into a timestamp. :param unicode data: Time string. :return: Timestamp in UTC timezone. :rtype: :py:class:`datetime.datetime` """ if data is None: return None timestamp, timezone = self.parse_date_with_timezone(data) return timestamp
[docs] def parse_date_with_timezone(self, data): """ Parse a time string into a timestamp. :param unicode data: Time string. :return: a two element list: Timestamp in UTC timezone, and the original timezone. :rtype: :py:class:`datetime.datetime` """ if data is None: return None assert(isinstance(data, str)) timestamp = parser.parse(data, tzinfos=tzinfos) timezone = timestamp.tzinfo if timezone is None: # Timestamp was ambiguous, assume UTC if not self.quiet: from sys import stderr stderr.write("Warning: Assuming UTC, input was '%s'" % data) timestamp = timestamp.replace(tzinfo=tz.tzutc()) elif timestamp.tzinfo.utcoffset(timestamp).total_seconds() != 0: # Timestamp was in some local timezone, # convert to UTC tzname = timezone.tzname(timestamp) if not self.quiet and tzname not in tzinfos: from sys import stderr stderr.write("Warning: Converting '%s' to UTC" % tzname) timestamp = timestamp.astimezone(tz.tzutc()) return [timestamp, timezone]