Russell Bateman December 2014 last update:
Nothing's wrong with Python. It's the best and most amazing scripting language.
However, as an application programming language, it simply isn't.
Python is able to maintain multiple threads...
#!/usr/bin/python import thread What's wrong with Python? import time # define a function for the thread def print_time( threadName, delay): count = 0 while count < 5: time.sleep(delay) count += 1 print "%s: %s" % ( threadName, time.ctime(time.time()) ) # create two threads as follows try: thread.start_new_thread( print_time, ("Thread-1", 2, ) ) thread.start_new_thread( print_time, ("Thread-2", 4, ) ) except: print "Error: unable to start thread" while 1: pass
However, threads cannot execute simultaneously, Python's interpreter cannot execute threads simultaneously. It possesses a global interpreter lock (GIL) that interprets compiled code only when held.
Additionally, the Python interpreter performs garbage collection.
GUnicorn (or "green unicorn") is a Python web server gateway interface HTTP server, ported from Ruby's Unicorn project, that can be used to implement HTTP services in Python. However, it doesn't multiprocess. GUnicorn is based on a prefork worker model in which there is a central master process that manages a set of worker processes. Only one executes at a time.
I want to make a crib sheet (better than I once did for Ruby back in the day)
def function( [ arguments ] ): class Class( [ heir-to ] ): pass self yield __init__
from_keyboard = raw_input( [ prompt ] ) string = "Legion" print "My name is %s", string print( "My name is %s." ) % string
file = open( filename [, mode ] ) print( file.read() ) file.truncate() file.write( data ) file.close() exists( filename )
assert raise except value-error , value : statement finally :
with ... as : exec global variable from package import symbol [ as alias ] import package lambda
list = [ first-value , second-value , third-value , ... ] one = variable[ which ] join pop
if condition : if condition : statements else: statements if condition : statements elif condition : statements else: statements statement-if-true if( condition ) else statment-if-false not and or is
for variable in collection : while condition : break continue
Despite what you see below, the Python community is very reluctant to adopt Python 3. It's because there are vwry sore incompatibilities.
Python 0.9 - February 1991 Python 1.0 - January 1994 Python 1.5 - December 31, 1997 Python 1.6 - September 5, 2000 Python 2.0 - October 16, 2000 Python 2.1 - April 17, 2001 Python 2.2 - December 21, 2001 Python 2.3 - July 29, 2003 Python 2.4 - November 30, 2004 Python 2.5 - September 19, 2006 Python 2.6 - October 1, 2008 Python 3.0 - December 3, 2008 Python 3.1 - June 27, 2009 Python 2.7 - July 3, 2010 --------------------- last release of Python 2 Python 3.2 - February 20, 2011 Python 3.3 - September 29, 2012 Python 3.4 - March 16, 2014
I keep this as a Tomboy note on my Linux desktop.
# String is empty or not if string: print 'String not empty' elif not string: print 'String is empty' # Trim white space from beginning or ending of string string.lstrip() string.rstrip() # Truncate string string = 'cats and dogs' firstHalf = string[ :4 ] # after column 4 ('cats') secondHalf = string[ 4: ] # up to column 4 (' and dogs') pos = string.find( 'x' ) string[ 0:pos+1 ] # after first 'x' # Snip string string[ 5: ] # lose up to column 5 string[ :-2 ] # lose last two characters # Prepend string to beginning result = prefix + string # Split string on character(s) string = 'This=is=a=test' stringList = string.split( '=' ) stringList = [ 'This', 'is', 'a', 'test' ] # Strip file extension import os.path name, extension = os.path.splitext( filename ) ext = os.path.splitext( filename )[ 0 ] # get extension # Match beginning or end of string string.startswith( 'match' ) string.endswith( 'match' ) # Find match in string match in string: True # Find match from beginning or end of string string.find( match ) string.rfind( match ) # Prepend string to beginning result = prefix + string # Convert number to string result = prefix + string # Ensure string is numeric string.isdigit() # Ensure string is numeric string.isdigit() # Make list from CSV string string = 'a, b, c' list = string.split( ',' ) # Make CSV string from list list = [ 'a', 'b', 'c' ] stringList = ', '.join( list ) # Java StringBuilder-like assembly from cStringIO import StringIO buffer = StringIO() print && buffer, 'This is %s string builder' % 'the best' print && buffer, ' of all time!' result = buffer.getvalue()
# Python load module def loadModule( path ): name, extension = os.path.splitext( os.path.split( path ) [ -1 ] ) module = imp.load_source( name, path ) return module
An array in Python is called a list, but that the identifier list is reserved. array[ '1', '2', ... 'N' ] # definition/initialization array = [] # ontology element in array # assignment array[ element ] = newelement # adding array.append( element ) # deleting del array[ element ] # traversal, [...] denoting optional functionality below for element in array: print element for index[, element] in range( len( array ) ): print index[, element]
A hashmap is called a dictionary. dict is a reserved keyword. dictionary[ 'key1' : 'value', 'key2' : 'value', ... 'keyN' : 'value' ] # definition/initialization dictionary = {} # ontology key in dictionary # assignment dictionary[ key ] = newvalue # adding/creating dictionary[ newkey ] = value # deleting del dictionary[ key ] dictionary.pop( key ) # traversal for key in dictionary: print key, dictionary[ key ]
Python SQL with SQLAlchemy DATABASE_PATH = 'mysql:///database.db' # getting started... Engine = sqlalchemy.create_engine( DATABASE_PATH ) DBSession = sqlalchemy.orm.sessionmaker( bind=Engine ) Session = DBSession() Session.bind = Engine # insert object; ObjectClass has a table name... object = ObjectClass( __init__() arguments ) (and/or fill out object) Session.add( object ) INSERT INTO tablename ( fields... ) Session.commit() VALUES( values... ); # query one (first) object... qo = Session.query( ObjectClass ) SELECT * from tablename qf = qo.filter( ObjectClass.field == fieldvalue ) WHERE field = fieldvalue; return qf.first() # query multiple (list of) objects... qo = Session.query( ObjectClass ) SELECT * from tablename qf = qo.filter( ObjectClass.field == fieldvalue ) WHERE field = fieldvalue; return qf
import xml.sax def getResult(): result = httpclient.get( uri ) payload = result.read() resultIdParser = CqPostResponsePayload() try: xml.sax.parseString( payload, resultIdParser ) except Exception as e: print e.message return resultIdParser.getResultId() class CqPostResponsePayload( xml.sax.ContentHandler ): ''' Parse response payload, looks something like: fWkcTS1a ''' def __init__( self ): self.result = StringIO() self.resultIdCharacters = '' def getResultId( self ): return self.result.getvalue().lstrip().rstrip() def startElement( self, tag, attributes=None ): if tag == 'result_id': self.resultIdCharacters = '' else: pass def endElement( self, tag ): if tag == 'result_id': # tie off the result_id... print && self.result, self.resultIdCharacters else: pass def characters(self, content ): self.resultIdCharacters += content -------------------------------------------------------------- def getValueOfTotalAttribute( line ): ''' Just the attributes. ''' parser = HitsTagElementParser() try: xml.sax.parseString( line, parser ) except Exception as e: print e.message return 0 attributes = parser.getAttributes() return attributes class HitsTagElementParser( xml.sax.ContentHandler ): def __init__( self ): self.attributes = {} def getAttributes( self ): return self.attributes def startElement( self, tag, attributes=None ): if tag != 'our-tag': return self.attributes = attributes def endElement( self, tag ): ''' We'll never hit this! ''' pass def characters( self, content ): ''' We're uninterested in this. ''' pass
Python tuples t1 = () red = ( 'red', ) # trick to get a single value classes = ( 'physics', 'chemistry', 1997, 2000 ) mix = red + classes stuff = [ 'this', 'that', 'and', 'the', 'other' ] a = ( 1, 2, 3 ) b = a + ( 4, 5, 6 ) options = () Functions cmp( t1, t2 ) len( t ) max( t ) min( t ) tuple( list ) # tuple from list list( tuple ) # list from tuple Examples >>> classes[ 1 ] 'chemistry' >>> mix ('red', 'physics', 'chemistry', 1997, 2000) >>> list( mix ) ['red', 'physics', 'chemistry', 1997, 2000] >>> tuple( stuff ) ('this', 'that', 'and', 'the', 'other') >>> b (1, 2, 3, 4, 5, 6) >>> options += ( '--all', ) >>> options ('--all',) >>> options += ( '--xpath', ) >>> options ('--all', '--xpath')
Python argument parsing Command line script.py --zoo keeper --ticket cat dog mouse Output (what will be in args) Namespace( ticket=True, filenames=[ 'cat', 'dog', 'mouse' ], zoo='keeper' ) Code import argparse try: parser = argparse.ArgumentParser( description='Parse arguments' ) parser.add_argument( 'filenames', nargs='*' ) # multiple arguments parser.add_argument( '--zoo' ) # prompting ( +argument) parser.add_argument( '--ticket', action='store_true' ) # ontological args = parser.parse_args() print( args ) except Exception as e: print( 'Oops: %s' % str( e ) ) Handy function def optionPresent( args, option ): return option in args and args[ option ]
I'm starting to develop a methodology for solving Python package problems:
import os print os.environ[ "PATH" ] print os.environ[ "PYTHONPATH" ] print os.getcwd()
Here are some examples. Please note that the project directory and the Python code root (not named src) share the same name.
~/dev $ tree artman artman +-- artman | +-- artman.py | +-- __init__.py | +-- init.py | +-- test | | +-- artmanTest.py | | +-- __init__.py | | `-- utilsTest.py | `-- utils.py `-- README.md 2 directories, 8 files
You could think of it this way:
~/dev $ tree artman artman_dir +-- artman_pkg | +-- artman.py | +-- __init__.py | +-- init.py | +-- test | | +-- artmanTest.py | | +-- __init__.py | | `-- utilsTest.py | `-- utils.py `-- README.md 2 directories, 8 files
Here's another example:
~/code/sandman $ tree . +-- LICENSE +-- README.md +-- TODO.md +-- docs | +-- conf.py | +-- generated | +-- index.rst | +-- installation.rst | +-- modules.rst | +-- quickstart.rst | `-- sandman.rst +-- requirements.txt +-- sandman | +-- __init__.py | +-- exception.py | +-- model.py | +-- sandman.py | `-- test | +-- models.py | `-- test_sandman.py `-- setup.py
Note that, the way I'm using this, I don't need (or want) most of the lines in endElement() and characters(), but I left them there for illustration.
import xml.sax from cStringIO import StringIO SESSION_LIST='''<sessions> <session id="group_searchappliance_indexer@default" state="dormant" users="0" /> <session id="group_searchappliance_system@default" state="dormant" users="0" /> <session id="group_searchappliance_search@default" state="dormant" users="0" /> <session id="group_searchappliance_support@default" state="dormant" users="0" /> <session id="user_searchadmin@default" state="dormant" users="0"> <sessionobj name="testSet" state="dormant" users="0" /> <sessionobj name="__test1__" state="dormant" users="0" /> <sessionobj name="testSet2" state="dormant" users="0" /> <sessionobj name="__test1__" state="dormant" users="0" /> </session> <session id="searchadmin" state="dormant" users="0" /> <session id="group_searchappliance_retriever@default" state="dormant" users="0" /> <session id="group_searchappliance_status@default" state="dormant" users="0" /> <session id="group_searchappliance_inboxes@default" state="dormant" users="0" /> <session id="group_searchappliance_feeder@default" state="dormant" users="0" /> </sessions> ''' class SessionHandler( xml.sax.ContentHandler ): def __init__( self ): self.CurrentData = '' self.id = '' self.state = '' self.users = '' self.result = StringIO() def getResult( self ): return self.result.getvalue() def addToResult( self, string ): print >> self.result, string def startElement( self, tag, attributes ): self.CurrentData = tag if tag == 'session': id = attributes[ 'id' ] state = attributes[ 'state' ] users = attributes[ 'users' ] self.addToResult( id + '( %s, %s )' % ( state, users ) ) def endElement( self, tag ): if self.CurrentData == 'id': print 'id: ', self.id elif self.CurrentData == 'state': print 'state: ', self.state elif self.CurrentData == 'users': print 'users: ', self.users self.CurrentData = '' def characters(self, content ): if self.CurrentData == 'id': print 'id: ', self.id elif self.CurrentData == 'state': print 'state: ', self.state elif self.CurrentData == 'users': print 'users: ', self.users if ( __name__ == "__main__" ): sh = SessionHandler() xml.sax.parseString( SESSION_LIST, sh ) string = sh.getResult() print string
Output:
group_searchappliance_indexer@default( dormant, 0 ) group_searchappliance_system@default( dormant, 0 ) group_searchappliance_search@default( dormant, 0 ) group_searchappliance_support@default( dormant, 0 ) user_searchadmin@default( dormant, 0 ) searchadmin( dormant, 0 ) group_searchappliance_retriever@default( dormant, 0 ) group_searchappliance_status@default( dormant, 0 ) group_searchappliance_inboxes@default( dormant, 0 ) group_searchappliance_feeder@default( dormant, 0 )
The right, Pythonic way to call functions, especially ones with long argument lists or that are very remote to the code being written is to name the arguments, e.g.:
def query( output_set=None, session_id=None, x=9, etc. )
Called thus:
result = query( session_id=sessionid output_set=outputset, etc. )
and the arguments can be out of order, missing, etc.
try: do something except: import traceback traceback.print_exc()
Uninitialized complaint, so...
queryType = compound = simple = allow = query = fieldname = description = '' # (add to shut up complaint) # This absolutely must succeed without error... try: queryType = jsonDict[ QUERYTYPE_KEY ] except: pass # ...but don't assert that all these exist because some will and others # won't depending on whether we're parsing a system or UI JSON. try: compound = jsonDict[ COMPOUNDQUERY_KEY ] except: pass try: simple = jsonDict[ SIMPLEQUERY_KEY ] except: pass try: allow = jsonDict[ ALLOWSETS_KEY ] except: pass try: query = jsonDict[ QUERYSTRING_KEY ] except: pass try: fieldname = jsonDict[ FIELDNAME_KEY ] except: pass try: description = jsonDict[ DESCRIPTION_KEY ] except: pass return( queryType, compound, simple, allow, query, fieldname, description ) # (complaint was here)
Unused complaint, so...
try: _, compound, simple, allowSystemSet, query, fieldname, description \ = self.__parseJson( content.replace( '\r\n', '' ) ) assert compound or simple # useful assertions, but they also ensure no unused message(s) assert allowSystemSet assert query assert fieldname assert description except ( KeyError, ValueError, AssertionError ) as error: response.status_int = httplib.INTERNAL_SERVER_ERROR return "500 Internal server error; definition file had (an) error(s) ( %s )" % error.message if compound: queryType = 'c' elif simple: queryType = 'q' content = self.__formatUsersJsonFromData( sessionid, definitionfilename, queryType, allowSystemSet, query, fieldname, description )
Whenever Python moans about the number of arguments, there's short list of observations to make:
As an example of the last one, I keep seeing the (nigh nonsensical) assertion that the function takes at least 2 arguments, but I've supplied 6. Well, duh! So, it turns out that this function used to be in a class and I lost the class because I didn't need or want it, but I'd forgotten to remove the self argument.
Here's the error:
TypeError: query() takes at least 2 arguments (6 given)
Here's the function statement; it used to be a class "method":
def query( self, query_string, query_type=None, outputset=None, sessionid=None, host=None, port=None ):
And here's the call:
return query( queryString, query_type=queryType, outputset=outputset, sessionid=sessionid, host=self._ss_host, port=self._ss_host )
def deriveLoggingLevel( self, level ): ''' Utility to derive logging level from a string. ''' return # sort of a Python switch/case statement { # with even a fall-through... 'ERROR' : logging.ERROR, 'error' : logging.ERROR, 'WARN' : logging.WARN, 'warn' : logging.WARN, #'INFO' : logging.INFO, # (handled by fall-though case) #'info' : logging.INFO, 'DEBUG' : logging.DEBUG, 'debug' : logging.DEBUG, 'TRACE' : logging.DEBUG, # (Python logging hasn't kept up) 'trace' : logging.DEBUG # (Python logging hasn't kept up) }.get( level, logging.INFO )
import collections d = { 'session9' : 0, 'session1' : 0 } d[ 'session9' ] = '{ "blue" : "fun" }' d[ 'session1' ] = '{ "red" : "boredom" }' d[ 'session3' ] = '{ "purple" : "porpoises" }' print d d = collections.OrderedDict( sorted( d.items() ) ) print d result = [] for i,j in d.iteritems(): result.append( '\n "%s" : %s\n' % ( i, j ) ) output = ', '.join( result ) print "{ %s}" % output.replace( '\n', '' )
If class stuff doesn't seem to be remembered, i.e.: __init__()-established variables don't seem to keep the values you gave them, maybe it's because you misspelled the name of this function.
class Thing( object ); def __init( self ): <------ misspelled! self.thingy = SomeThingy()
The class variables are shared across all objects (or "instances") of the class. The instance variables are called "object varlables." When list1 is referenced:
c = Class() print c.list1
Python looks for an object variable named list1 first, then goes for the class variable.
class Class( object ): list1 = [] # class variables here—think Java static dict1 = {} name = 'Tony' def __init__( self ): self.var1 = 9 # instance variables here—think Java instance... self.var2 = False
Let's remember how we make a JSON dictionary in Python.
import json # basic assembly... jsonDict = {} # the JSON object key = 'key' # pretend key value = 'value' # pretend value jsonDict[ key ] = value # assemble tuple into object jsonString = json.dumps( jsonDict ) # export to JSON string print jsonString # { "key" : "value" } jsonDict2 = json.loads( jsonString ) # import object from string # export JSON to a file... json.dump( jsonDict2, open( path, 'w' ) ) # import JSON from a file... jsonDict3 = json.load( open( path, 'r' ) )
Just for fun, observe this Python session to help understand things.
>>> import json >>> s = '"testSet" : { "description" : "This is a description." }' >>> print s "testSet" : { "description" : "This is a description." } >>> json.loads( s ) Traceback (most recent call last): File "", line 1, in File "/usr/lib64/python2.7/json/__init__.py", line 338, in loads return _default_decoder.decode(s) File "/usr/lib64/python2.7/json/decoder.py", line 368, in decode raise ValueError(errmsg("Extra data", s, end, len(s))) ValueError: Extra data: line 1 column 11 - line 1 column 57 (char 10 - 56) >>> jsonDict = {} >>> jsonDict[ 'description' ] = 'This is a description.' >>> json.dumps( jsonDict ) '{"description": "This is a description."}' >>> jsonDict2 = {} >>> jsonDict2[ 'testSet' ] = jsonDict >>> json.dumps( jsonDict2 ) '{"testSet": {"description": "This is a description."}}' >>> s2 = json.dumps( jsonDict2 ) >>> json.loads( s2 ) {u'testSet': {u'description': u'This is a description.'}} >>> jsonDict3 = json.loads( s2 ) >>> json.dumps( jsonDict3 ) '{"testSet": {"description": "This is a description."}}'
Illustration of the difference between using class in Python and not using it:
class CState( object ): def __init__( self ): self.field = 'init' # field is "instance variable" in Java def add( self, x ): self.field += x def mult( self, x ): self.field *= x s = State() s.add( 'added' ) # self is implicitly passed in s.mult( 2 ) # ibid print( s.field )
~ $ python Python 2.7.5 (default, Nov 3 2014, 14:26:24) >>> from state import CState >>> s = CState() >>> print s >>> s.add( 'added' ) >>> print s >>> print s.field initadded >>> s.mult( 2 ) >>> print s.field initaddedinitadded
Without class and self:
def state_init( state ): state[ 'field' ] = 'init' def state_add(state, x ): state[ 'field' ] += x def state_mult(state, x ): state[ 'field' ] *= x def state_getField( state ): return state[ 'field' ] myself = {} state_init( myself ) state_add( myself, 'added' ) state_mult( myself, 2 ) print( state_getField( myself ) )
And the demonstration:
~ $ python Python 2.7.5 (default, Nov 3 2014, 14:26:24) >>> from state import state_init, state_add, state_mult, state_getField >>> myself = {} >>> state_init( myself ) >>> print myself {'field': 'init'} >>> state_add( myself, 'added' ) >>> print myself {'field': 'initadded'} >>> state_mult( myself, 2 ) >>> print myself {'field': 'initaddedinitadded'} >>> print( state_getField( myself ) ) initaddedinitadded
Here's a good link on intrinsic functions whose names are bracketed with underscores: http://www.rafekettler.com/magicmethods.pdf. And, here's a bit more on the Python code map relating stuff to Java and/or C/C++.
VARIABLE_9 = 9
C/C++ extern variable
module variable (global)
def someFunction(): print "I'm someFunction()"
C/C++ function
function
class SomeClass( object ): variable_99 = 99
Java static variable
class attribute (but value can be changed)
def __init__( self ): self.variable_999 = 999
Java constructor Java instance variable
initializer instance variable
def __del__( self ): ...code to clean up instance
C++ destructor
def __hash__( self ): ...code to create unique integer
Java hashCode()
def __eq__( self, other ):
Java equals( other )
def __str__( self ): ...code to translate to string
Java toString()
def __nonzero__( self ): ...code to implement judgement
returns True/False depending on instance
def __dir__( self ): ...code to make dictionary
Java reflection
def someMethod( self ): print "I'm someMethod()"
Java method in C++: member function
method
I was bitten by this regex problem once.
SESSIONID_PATTERN = '[A-Za-z0-9._-@]+' (old) SESSIONID_PATTERN = '[A-Za-z0-9._-@]+' (new, wrong) SESSIONID_PATTERN = '[A-Za-z0-9@._-]+' (right)
The expression is used to provide for acceptable characters in a session id (so, an e-mail address was not an acceptable session id). I needed an e-mail address to be valid and simply added the at-sign at the end which made regex think the hyphen was a range, which didn't work as a range and so the whole thing blew up. The hyphen has to be the last character, escaping the at-sign doesn't work either.
Sometimes, to simulate the environment of excerpted code, you need to monkey with the PYTHONPATH in order to avoid using different or augmented import statements from the original environment for the code.
For example, I had to do:
import systemsets.searchui.controllers.sessions import systemsets.searchui.lib.do_query as do_query
instead of
import searchui.controllers.sessions import searchui.lib.do_query as do_query
in this project:
master ~/dev/systemsets $ tree . +-- systemsets | +-- __init__.py | +-- PSAPIException.py | +-- pylons | | +-- config.py | | +-- __init__.py | | +-- request.py | | `-- response.py | +-- searchui | | +-- controllers | | | +-- __init__.py | | | +-- sessionsGlobals.py | | | `-- sessions.py | | +-- __init__.py | | `-- lib | | +-- base.py | | +-- do_query.py | | `-- __init__.py | +-- sets.py | `-- uibase | +-- __init__.py | `-- lib | +-- auth_conditions.py | `-- __init__.py `-- test +-- __init__.py +-- setsTest.py `-- testutils.py
This was annoying as I didn't wish to maintain two versions of sets.py, one in the real environment and one for use in PyDev. (There were reasons I could not use PyDev directly inside the real code base.)
master ~/dev/systemsets $ cat .pydevproject <?xml version="1.0" encoding="UTF-8" standalone="no"?> <?eclipse-pydev version="1.0"?><pydev_project> <pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH"> <path>/${PROJECT_DIR_NAME}</path> <path>/${PROJECT_DIR_NAME}/systemsets</path> </pydev_pathproperty> <pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python 2.7</pydev_property> <pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Default</pydev_property> </pydev_project>
Sometimes you just gotta stop and sharpen up a saw against something annoying you. This could easily be transformed into a useful command-line script, but od -x already exists there so why? I happen to be using it from other Python code in debugging because I've got a mystifying representation problem that prevents me from understanding (it's the debugger's fault) when \n is a newline (control character) and when it's just a backslash followed by the letter n.
#!/usr/bin/python ''' Given a string, produces a dump a la od -x: "This is a test of the Emergency Broadcast System.": 00000 5468 6973 2069 7320 6120 7465 7374 0a6f 'This is a test.o' 00001 6620 7468 6520 456d 6572 6765 6e63 7920 'f the Emergency ' 00002 4272 6f61 6463 6173 7420 5379 7374 656d 'Broadcast System' 00003 2e '.' @author: Russell Bateman @since: February 2015 ''' import sys LEFT_COLUMN_WIDTH = '%05d' # this value is simply and easily changed PAD = ' ' # padding between the three major columns CENTER_COLUMN_WIDTH = 32 # don't muck lightly with this value RIGHT_COLUMN_WIDTH = 16 # ibid--should be center / 2 def printIt( thing ): sys.stdout.write( thing ) def dotControlCharactersInCopy( string ): # if expect more control characters, fix those too return string.replace( '\n', '.' ) def prepareRow( hexString, width ): hexString = hexString[ :width ] row = [] for _ in range( 0, width ): if len( hexString ) > 0: row.append( hexString[ :2 ] ) hexString = hexString[ 2: ] else: return row return row def chokeUp( string, width ): return string[ width: ] def printHexDumpOfString( string ): hexString = string.encode( "hex" ) dottedCopy = dotControlCharactersInCopy( string ) lineNumber = 0 current = 0 length = len( hexString ) while current < length: # print line number in left column... printIt( LEFT_COLUMN_WIDTH % lineNumber ) printIt( PAD ) # print row/list of elements by groups of two; there are # 2 digits per hex character, 4 per group... row = prepareRow( hexString, CENTER_COLUMN_WIDTH ) middleColumn = '' width = len( row ) for element in range( 0, CENTER_COLUMN_WIDTH/2, 2 ): if element < width: middleColumn += row[ element ] else: middleColumn += ' ' if element+1 < width: middleColumn += row[ element+1 ] else: middleColumn += ' ' middleColumn += ' ' printIt( middleColumn ) printIt( PAD ) # print right-column string copy... rightColumn = dottedCopy[ :RIGHT_COLUMN_WIDTH ] printIt( "'%s'" % rightColumn ) # chock up on counters and strings... hexString = chokeUp( hexString, CENTER_COLUMN_WIDTH ) dottedCopy = dottedCopy[ RIGHT_COLUMN_WIDTH: ] lineNumber += 1 current += CENTER_COLUMN_WIDTH printIt( '\n' ) def test(): string = 'This is a test\nof the Emergency Broadcast System.' print '"%s":' % string print printHexDumpOfString( string ) if __name__ == "__main__": test() # vim: set tabstop=4 shiftwidth=4 expandtab:
Below, x is noted as "unresolved" by the compiler (an error) at line 4.
x = None def func(): if not x: x = 9
~ $ python Python 2.7.5 (default, Nov 3 2014, 14:26:24) >>> x = None >>> def func(): ... if not x: ... x = 9 ... >>> func() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<stdin>", line 2, in func UnboundLocalError: local variable 'x' referenced before assignment
This is solved thus:
x = None def func(): global x if not x: x = 9
~ $ python Python 2.7.5 (default, Nov 3 2014, 14:26:24) >>> x = None >>> def func(): ... global x ... if not x: ... x = 9 ... >>> func()
def traceLog( message ): now = datetime.datetime.now().strftime( '%Y-%m-%d %H:%M:%S.%f' ) sys.stdout.write( 'TRACE: ' + now + ': ' + message + '\n' ) sys.stdout.flush()
TRACE: 2015-03-09 14:07:04.462686: <message>
If you see this error (in PyCharm at least), it means that you've tried to begin to use keyword parameters before non-named ones. Also, it's very Pythonic to apply names to each and every argument.
class BatchRunnerException( Exception ): def __init__( self, message='', argumentName=None, errnoValue=0 ): self.message = message self.argumentName = argumentName self.errnoValue = errnoValue def getMessage( self ): return self.message def getErrnoValue( self ): return self.errnoValue def getArgumentName( self ): return self.argumentName # vim: set tabstop=4 shiftwidth=4 expandtab:
if action != ACTION_RUN and action != ACTION_PUBLISH: raise BatchRunnerException( 'Illegal argument exception (--action)', errnoValue=errno.EINVAL, argumentName='action' ) try: validateDate( startDate ) except Exception as error: raise BatchRunnerException( 'Illegal argument exception (%s)' % error, errnoValue=errno.EINVAL, argumentName='start' )
Consuming this code...
import sys class BatchRunnerException( Exception ): def __init__(self, *args, **kwargs): # order is message (*args--unnamed), argumentName (**kwargs), errnoValue (**kwargs) self.message = args[ 0 ] if 'argumentName' in kwargs: self.argumentName = kwargs[ 'argumentName' ] else: self.argumentName = None if 'errnoValue' in kwargs: self.errnoValue = kwargs[ 'errnoValue' ] else: self.errnoValue = None def getMessage( self ): return self.message def getErrnoValue( self ): return self.errnoValue def getArgumentName( self ): return self.argumentName def printOnStderr( self, label ): if label: sys.stderr.write( label ) if not self.errnoValue or self.errnoValue == 0: sys.stderr.write( ' %d\n' % self.message ) else: sys.stderr.write( ' %s (%d)\n' % ( self.message, self.errnoValue ) ) sys.stderr.flush() # vim: set tabstop=4 shiftwidth=4 expandtab:
Here's how to import a "complex" module from elsewhere...
So, simple dynamic importing for some module isn't working. Why? Because while runner_obj.py is found, what it imports isn't findable because not in PYTHONPATH. One solution proposed, that works under some circumstances, is static importing. Let's walk that path:
Do it "statically" after adding useful paths to PYTHONPATH:
import os import sys # to get: os.path.dirname() os.path.abspath() sys.path.append() # etc. as needed. Then! Do this: import complex_module # which should work...
Note that sys.path is PYTHONPATH
sys.path
This should work because the os and, especially, sys.path.append() functions were used to add the necessary path or paths as required to a) reach the complex module and b) reach what the complex module needs. (This may even be just one root-ish path and the deeper paths to reach the module should do the trick.)
An example. Here's the top of a complex module we're importing. The module itself is at sandboxes/webapp/extensions/scorecard/model. Let's call it runner_obj.py.
import json import ordereddict import datetime import sqlalchemy import sqlalchemy.orm from sqlalchemy.sql import func from uibase.model.meta import Base from uibase.lib.db_base import BaseEntry, randString ...
We ourselves are running relative to webapp, sandboxes/webapp/extensions/scorecard/support/scorecard-runner and that needs to be factored in as the starting point.
Modules meta.py and db_base.py are found at sandboxes/repo/run/webapp/uibase/lib. The intersection is, of course, webapp (which is actually the application root when this is all installed in /opt.
So, what we want to do is determine the least common denominating path a) to runner_obj.py and b) the other two:
webapp/extensions/scorecard/model/runner_obj.py webapp/uibase/model/meta.py webapp/uibase/lib/db_base.py
Then, we create the import statements:
import extensions.scorecard.model.runner_obj import uibase.model.meta import uibase.lib.db_base
We don't need to express the second two since runner_obj.py is doing that already, we just need the least common denominator in PYTHONPATH. First, we need a manifest constant here to represent where webapp is in relation to us:
import os import sys WEBAPP_PATH = '../../../' # the path we'll add to represent webapp must be absolute webapp_abs = os.path.abspath( WEBAPP_PATH ) try: sys.path.append( webapp_abs ) expect: pass
Now we're ready to import our module and its dependents should follow:
import os import sys WEBAPP_PATH = '../../../' webapp_abs = os.path.abspath( WEBAPP_PATH ) try: sys.path.append( webapp_abs ) expect: pass import extensions.scorecard.model.runner_obj RunnerObject = runner_obj.RunnerObject()
Nope! This doesn't work, at very least because of the dots. Python says extensions, scorecard and model are not defined (see highlighted line). It won't even compile. Dynamic importing works now, however:
import os import imp import sys WEBAPP_PATH = '../../../' RUNNER_OBJ_PATH = 'extensions/scorecard/model' webapp_abs = os.path.abspath( WEBAPP_PATH ) try: sys.path.append( webapp_abs ) expect: pass absPath = os.path.join( webapp_abs, RUNNER_OBJ_PATH ) try: runner_obj = imp.load_source( 'runner_obj', absPath ) except IOError as error: # means couldn't find the module from the path—fix RUNNER_OBJ_PATH pass except ImportError as error: # means something like there's a dependency not resolvable using PYTHONPATH—fix sys.path.append() pass RunnerObject = runner_obj.RunnerObject()
The trick that works is to set up PYTHONPATH to support the module to be imported, but still use imp.load_source() to do the import dynamically.
import random, string def getRandomId(): set = string.lowercase + string.digits return ''.join( random.sample( set, 6 ) )
I began looking more seriously into best practice for exception-handling in Python. Here is the best of what I read. I resume some of what I've learned here:
Look before you leap (LBYL) is the wrong-headed impulse I am guilty of which is explained by more than 2 decades of C and other procedural programming languages in which there's little chance of recovering after even simple faults.
In short, LBYL is just damned wrong! This isn't specifically a lesson from Python exceptions, but holds also for real programming languages like Java and C#.
If you call str.find( string ), you don't expect an exception for the string not being found. Instead, you'd expect an error code or appropriate value coming back to indicate that (None or -1). However, if you pass a defective string or create a case where a language anomaly like str[ i ] constitutes an invalid index, etc., an exception is appropriate.
This is a little more esoteric, but make partly sense. Don't throw stuff up the call hierarchy to levels at which it isn't relevant.
The example given is managing an internal cache using a file. Later, perhaps the implementation might no longer resort to using file.
Perhaps the implementation cannot open the cache file, the underlying open() failing. Don't leak this failure out through the interface because it's not relevant, it breaks encapsulation since the consumer doesn't (shouldn't) care that it's a file cache. Instead, wrap in custom exception, e.g.: CacheFailedException preserving the information wrapped inside:
class CacheFailedException( Exception ): pass def doCachingStuff(): try: cache = open( path ) ... except IOError as e: raise CacheFailedException( 'Caching error: %s' % e )
Benefits:
More extensively, if you need to preserve the original traceback (unusual):
class CacheFailedException( Exception ): pass def doCachingStuff(): try: cache = open( path ) ... except IOError: excClass, exc, traceback = sys.exc_info() x = CacheFailedException( 'Caching error: %s' % exc ) raise x.__class__, x, traceback
...each function in terms of its contract:
Don't make up named exceptions for stuff that's common KeyError, ValueError, TypeError, etc. Implement such functionality and error handling in imitation of how it's already done in Python so that consumers won't be surprised.
Be careful not to use exception handling for flow control, but do create named exceptions for special purposes. What happens?
In the following representation, an application that calls a function A that calls a function B that calls a function C which detects an error and raises it as NamedException, functions B, then A, then the application can trap that exception and do something else with it. If no code catches it, it will make it all the way back to Python which will print something that includes NamedException and usually the message it included.
Python Python Python Python Python Python Python Python Python Python Python | ^ (Python will print stack trace at the bottom of which | | will be printed "NamedException" and the message field.) v | Application main() | | | (if Application main() doesn't catch the exception...) | | | | v | Function A() | | | (if Function A() doesn't catch the exception...) | | | | v | Function B() | | | (if Function B() doesn't catch the exception...) | | | | v | Function C() | raise NamedException( "I'm a named exception; something bad happened." )
Here's the code:
class NamedException( Exception ): def __init__( self, *args, **kwargs ): self.message = args[ 0 ] def getMessage( self ): return self.message def main(): functionA() def functionA(): functionB() def functionB(): functionC() def functionC(): raise NamedException( "I'm a named exception; something bad happened." ) if __name__ == "__main__": main()
...and here's the behavior:
~ $ python poop.py Traceback (most recent call last): File "poop.py", line 21, in main() File "poop.py", line 9, in main functionA() File "poop.py", line 12, in functionA functionB() File "poop.py", line 15, in functionB functionC() File "poop.py", line 18, in functionC raise NamedException( "I'm a named exception; something bad happened." ) __main__.NamedException
try: x = int( 'This is a test' ) except Exception as e: print( 'Handling error thus: print( str( e ) )' ) print( str( e ) ) print( 'That\'s all! *****************' )
$ python -i Python 2.7.5 (default, Apr 10 2015, 08:09:05) [GCC 4.8.3 20140911 (Red Hat 4.8.3-7)] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> import poop Handling error thus: print( str( e ) ) invalid literal for int() with base 10: 'This is a test' That's all! *****************
profile.py import time from functools import wraps PROF_DATA = {} def profile(fn): @wraps(fn) def with_profiling(*args, **kwargs): start_time = time.time() ret = fn(*args, **kwargs) elapsed_time = time.time() - start_time if fn.__name__ not in PROF_DATA: PROF_DATA[fn.__name__] = [0, []] PROF_DATA[fn.__name__][0] += 1 PROF_DATA[fn.__name__][1].append(elapsed_time) return ret return with_profiling def print_prof_data(): for fname, data in PROF_DATA.items(): max_time = max(data[1]) avg_time = sum(data[1]) / len(data[1]) print "Function %s called %d times. " % (fname, data[0]), print 'Execution time max: %.3f, average: %.3f' % (max_time, avg_time) def clear_prof_data(): global PROF_DATA PROF_DATA = {} x.py: @profile def your_function(...): ...
profile.py import time from functools import wraps PROF_DATA = {} def profile(fn): @wraps(fn) def with_profiling(*args, **kwargs): start_time = time.time() ret = fn(*args, **kwargs) elapsed_time = time.time() - start_time if fn.__name__ not in PROF_DATA: PROF_DATA[fn.__name__] = [0, []] PROF_DATA[fn.__name__][0] += 1 PROF_DATA[fn.__name__][1].append(elapsed_time) return ret return with_profiling def print_prof_data(): for fname, data in PROF_DATA.items(): max_time = max(data[1]) avg_time = sum(data[1]) / len(data[1]) print "Function %s called %d times. " % (fname, data[0]), print 'Execution time max: %.3f, average: %.3f' % (max_time, avg_time) def clear_prof_data(): global PROF_DATA PROF_DATA = {}
@profile def your_function(...): ...
This is almost always...
An example of creating a specialized or proprietary exception and raising it.
class StuffCachingError( Exception ): pass def do_stuff(): try: cache = open( filename ) # do stuff with cache except IOError, e: raise StuffCachingError( 'Caching error: %s' % e )
It's not as explicit as in Java.
import foo def main(): try: foo.foo() except Exception as e: print e.message
import bar def foo(): # don't have to say we might throw an exception here bar.bar()
def bar(): raise Exception( 'Busted!' )
~ $ python Python 2.7.5 (default, Nov 3 2014, 14:26:24) [GCC 4.8.3 20140911 (Red Hat 4.8.3-7)] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> import main >>> main.main() Busted!
Here's a pretty thing...a way to find the nth occurrence of a substring in a string. I first used it with another brilliant construct, that of using mmap() to turn a file into a virtual string (albeit actually a bytearray)
import os import mmap HIT_START_TAG = '<hit' fp = open( pathname ) file = mmap.mmap( fp.fileno(), 0, access=mmap.ACCESS_READ ) first = findNth( file, HIT_START_TAG, 1 ) def findNth( haystack, needle, n=1 ): ''' Find the nth occurrence of a substring in a string. :param haystack: the string. :param needle: the substring to search for. :param n: the nth magnitude. :return: offset in haystack of needle. ''' width = len( needle ) start = haystack.find( needle ) while( start >= 0 and n > 1 ): start = haystack.find( needle, start+width ) n -= 1 return start
#!/usr/bin/python ''' Given a string, produces a dump à la od -x: "This is a test of the Emergency Broadcast System.": 00000 5468 6973 2069 7320 6120 7465 7374 0a6f 'This is a test.o' 00010 6620 7468 6520 456d 6572 6765 6e63 7920 'f the Emergency ' 00020 4272 6f61 6463 6173 7420 5379 7374 656d 'Broadcast System' 00030 2e '.' @author: Russell Bateman @since: February 2015 ''' import sys LEFT_COLUMN_WIDTH = '%05X' # this value is simply and easily changed PAD = ' ' # padding between the three major columns CENTER_COLUMN_WIDTH = 32 # don't muck lightly with this value RIGHT_COLUMN_WIDTH = 16 # ibid--should be center / 2 def printIt( thing ): sys.stdout.write( thing ) def dotControlCharactersInCopy( string ): # if expect more control characters, fix those too return string.replace( '\n', '.' ) def prepareRow( hexString, width ): hexString = hexString[ :width ] row = [] for _ in range( 0, width ): if len( hexString ) > 0: row.append( hexString[ :2 ] ) hexString = hexString[ 2: ] else: return row return row def chokeUp( string, width ): return string[ width: ] def printHexDumpOfString( string ): hexString = string.encode( "hex" ) dottedCopy = dotControlCharactersInCopy( string ) lineNumber = 0 current = 0 length = len( hexString ) while current < length: # print line number in left column... printIt( LEFT_COLUMN_WIDTH % lineNumber ) printIt( PAD ) # print row/list of elements by groups of two; there are # 2 digits per hex character, 4 per group... row = prepareRow( hexString, CENTER_COLUMN_WIDTH ) middleColumn = '' width = len( row ) for element in range( 0, CENTER_COLUMN_WIDTH/2, 2 ): if element < width: middleColumn += row[ element ] else: middleColumn += ' ' if element+1 < width: middleColumn += row[ element+1 ] else: middleColumn += ' ' middleColumn += ' ' printIt( middleColumn ) printIt( PAD ) # print right-column string copy... rightColumn = dottedCopy[ :RIGHT_COLUMN_WIDTH ] printIt( "'%s'" % rightColumn ) # chock up on counters and strings... hexString = chokeUp( hexString, CENTER_COLUMN_WIDTH ) dottedCopy = dottedCopy[ RIGHT_COLUMN_WIDTH: ] lineNumber += 1 current += CENTER_COLUMN_WIDTH printIt( '\n' ) def test(): string = 'This is a test\nof the Emergency Broadcast System.' print '"%s":' % string print printHexDumpOfString( string ) if __name__ == "__main__": args = sys.argv pathname = args[ 1 ] with open( pathname, 'r' ) as fp: string = fp.read() printHexDumpOfString( string ) # vim: set tabstop=2 shiftwidth=2 expandtab:
Given the following configuration file...
[scorecard_runner] hostname = 10.10.8.155 password = psadpsad runner_set = { "measures" : { "queryid" : "Q4V6braw", "measureid_list" : [ "RAK", "ABA", "TMS", "IAK", "AAB", "EDK", "CDC", "TSC", "COL", "CIS", "AST", "CBP", "HBP", "PEA", "AWC", "TCC", "ASM" ], "payer_types_list" : [ "Commercial" ] }, "initiatives" : { "queryid" : "Q4V6braw", "measureid_list" : [ "RAK" ], "payer_types_list" : [ "Commercial", "Medicare" ] } }
The following reads it and initializes a number of variables for it.
import json import ConfigParser def getConfiguration( config, section, item ): try: return config.get( section, item ) except ConfigParser.NoSectionError: raise except ConfigParser.NoOptionError: # item not found; no matter, return None return None except Exception: # other exception? unlikely, but just return None return None # Here's the business... config = ConfigParser.RawConfigParser() config.read( 'configuration.ini' ) hostname = getConfiguration( config, 'scorecard_runner', 'hostname' ) password = getConfiguration( config, 'scorecard_runner', 'password' ) jsonString = getConfiguration( config, 'scorecard_runner', 'runner_set' ).replace( '\n', ' ' ) runner_dict = json.loads( jsonString ) measures = runner_dict[ 'measures' ] queryid = measures[ 'queryid' ] initiatives = runner_dict[ 'initiatives' ] queryid = initiatives[ 'queryid' ] print ' hostname =', hostname print ' password =', password print ' jsonString =', jsonString print ' measures =', measures print ' measures.queryid =', queryid print ' initiatives =', initiatives print 'initiatives.queryid =', queryid
This is instructive.
~ $ python >>> number = 1 >>> number 1 >>> string = '1' >>> string '1' >>> z = int( number ) >>> z 1 >>> z = int( string ) >>> z 1
When tempted to use isinstance( object ) or type( object ) to determine what object is, just don't. For one thing, it's slow. For another, it's isn't Python. The right way to do it is just to use it as if it were what you want, an integer, a dictionary, etc., then use try ... except to recover. The interpreter is faster for it:
string = 1 # (integer not string!) try: string[ 0 ] except TypeError: # Oops, I thought this was a string, but it's something else...
$ python Python 2.7.5 (default, Apr 10 2015, 08:09:05) >>> d = {'orange': 50, 'chartreuse': 89, 'red': 110, 'fushia': 18, 'green': 82, 'blue': 40} >>> d {'blue': 40, 'fushia': 18, 'chartreuse': 89, 'green': 82, 'orange': 50, 'red': 110} >>> for key, value in sorted( d.items() ): ... print( key, value ) ... ('blue', 40) ('chartreuse', 89) ('fushia', 18) ('green', 82) ('orange', 50) ('red', 110) >>> import collections >>> sd = collections.OrderedDict( sorted( d.items() ) ) >>> sd OrderedDict([('blue', 40), ('chartreuse', 89), ('fushia', 18), ('green', 82), ('orange', 50), ('red', 110)])
import base64 import urllib2 import globals ''' Client raises URLError or HTTPError. When posting JSON, use content type of 'application/x-www-form-urlencoded' else, use content type of 'application/xml; charset=htf-8'. ''' CONTENT_TYPE_FORM = 'application/x-www-form-urlencoded' # for POSTs of JSON payloads CONTENT_TYPE_XML = 'application/xml; charset=utf-8' # for PUTs and POSTs of XML payloads def postJson( uri, payload ): authHeader = prepareAuthHeader( CONTENT_TYPE_FORM ) return urllib2.urlopen( urllib2.Request( uri, headers=authHeader ), payload ) def postXml( uri, payload ): authHeader = prepareAuthHeader( CONTENT_TYPE_XML ) return urllib2.urlopen( urllib2.Request( uri, headers=authHeader ), payload ) def put( uri, payload ): authHeader = prepareAuthHeader( CONTENT_TYPE_XML ) opener = urllib2.build_opener( urllib2.HTTPHandler ) request = urllib2.Request( uri, data=payload, headers=authHeader ) request.add_header( 'Content-Type', 'application/json' ) request.get_method = lambda: 'PUT' return opener.open( request ) def get( uri ): authHeader = prepareAuthHeader() request = urllib2.Request( uri, headers=authHeader ) answer = urllib2.urlopen( request ).read() return answer def delete( uri ): authHeader = prepareAuthHeader() if uri and authHeader: pass def prepareAuthHeader( contentType=None ): AUTH_HEADER = {} if contentType: AUTH_HEADER[ 'Content-type' ] = contentType userAndPassword = globals.USERNAME + ':' + globals.PASSWORD base64Encoding = base64.b64encode( userAndPassword ) authorization = 'Basic ' + base64Encoding AUTH_HEADER[ 'Authorization' ] = authorization return AUTH_HEADER
Invoke Python thus:
$ python -i
To access values in tuple, use the square brackets for slicing along with the index or indices to obtain value available at that index. For example, ...
tup1 = ('physics', 'chemistry', 1997, 2000); tup2 = (1, 2, 3, 4, 5, 6, 7 ); print "tup1[ 0 ]: ", tup1[ 0 ] print "tup2[ 1:5 ]: ", tup2[ 1:5 ] When the above code is executed, it produces the following result... tup1[ 0 ]: physics tup2[ 1:5 ]: [2, 3, 4, 5]
When the above code is executed, it produces the following result...
tup1[ 0 ]: physics tup2[ 1:5 ]: [2, 3, 4, 5]
Recipe for getting a string JSON from database, adding to it and reincorporating it into the database. Let's say that the result we're adding to what's already there is of "type" QueryResult. This is pseudocode: I'm not fleshing out the SQLAlchemy and other database underpinnings.
# persistence.py: import json import sqlalchemy oid = None # OID for our object. def persistResult( resultToAdd ): global oid try: object = getDatabaseObject( oid ) # get the existing list of results resultList = json.loads( object.result_list ) # make a Python list of them resultList.append( resultToAdd.toJson() ) # append the most recent result to add object.result_list = json.dumps( resultList ) # dump the Python/JSON list as a string except Exception: raise Exception( 'Unable to persist result (%s): %s' % ( resultToAdd.getResultId(), e.message ) ) # database.py: class DatabaseObject( Base ): def __init__( self ): self.result_list = Column( String( 1024 ) ) # etc. def getDatabaseObject( oid ): try: qo = Session.query( DatabaseObject ) qf = qo.filter( DatabaseObject.id == oid ) return qf.first() except Exception as e: raise Exception( 'Botched query operation (%s): %s' % ( oid, e.message ) ) # queryresult.py: class QueryResult( object ): def __init__( self, queryid=None, resultid=None ): self.resultid = resultid self.queryid = queryid self.complete = False self.last = now() def getQueryId( self ): return self.queryid def getResultId( self ): return self.resultid def sinceLastLook(self): return now() - self.last def isComplete( self ): return self.complete def markComplete( self ): self.complete = True self.last = now() def toJson( self ): qrDict = {} qrDict[ 'resultid' ] = self.resultid qrDict[ 'queryid' ] = self.queryid return qrDict from time import time def now(): return time() # seconds since the Epoch # vim: set tabstop=2 shiftwidth=2 expandtab:
I had need of a file-type validator for something I was doing. Thought I'd put it here.
Test first, code second...
import sys import unittest import ftvalidate import testutilities PRINT = True # change to False when finished debugging... GOOD_JSON = '{ "json" : "This is a JSON!" }' BAD_JSON = ' This isn\'t JSON! ' GOOD_XML = ' This is XML! ' FHIR_XML = ' This is FHIR XML! ' CDA_XML = ' This is merely CDA XML! ' CCD_XML = ''' This is CDA XML! ''' BAD_XML = '{ "xml" : "This isn\'t XML" }' GOOD_PDF = 'path/to/file.pdf' # GOOD_PDF = './GoodFile.pdf' # gotta copy some PDF to this name in the local subdirectory BAD_PDF = 'This ain\'t no PDF file.' class FileTypeValidateTest( unittest.TestCase ): """ Test ftvalidate.py. """ @classmethod def setUpClass( cls ): testutilities.turnOnPrinting( PRINT ) def setUp( self ): pass def tearDown( self ): pass def testGoodFileAsJson( self ): testutilities.printTestCaseName( sys._getframe().f_code.co_name ) temp = testutilities.createTemporaryFile( '.json', GOOD_JSON ) result = ftvalidate.validateFile( 'json', temp ) testutilities.eraseTemporaryFile( temp ) self.assertTrue( result ) def testBadFileAsJson( self ): testutilities.printTestCaseName( sys._getframe().f_code.co_name ) temp = testutilities.createTemporaryFile( '.json', BAD_JSON ) result = ftvalidate.validateFile( 'json', temp ) testutilities.eraseTemporaryFile( temp ) self.assertFalse( result ) def testGoodFileAsXml( self ): testutilities.printTestCaseName( sys._getframe().f_code.co_name ) temp = testutilities.createTemporaryFile( '.xml', GOOD_XML ) result = ftvalidate.validateFile( 'xml', temp ) testutilities.eraseTemporaryFile( temp ) self.assertTrue( result ) def testBadFileAsXml( self ): testutilities.printTestCaseName( sys._getframe().f_code.co_name ) temp = testutilities.createTemporaryFile( '.xml', BAD_XML ) result = ftvalidate.validateFile( 'xml', temp ) testutilities.eraseTemporaryFile( temp ) self.assertFalse( result ) @unittest.skip( "requires set-up by hand" ) def testGoodFileAsPdf( self ): """ In order for this test to pass, you must supply the path to a valid PDF file in GOOD_PDF. """ testutilities.printTestCaseName( sys._getframe().f_code.co_name ) result = ftvalidate.validateFile( 'pdf', GOOD_PDF ) self.assertTrue( result ) def testBadFileAsPdf( self ): testutilities.printTestCaseName( sys._getframe().f_code.co_name ) temp = testutilities.createTemporaryFile( '.pdf', BAD_PDF ) result = ftvalidate.validateFile( 'pdf', temp ) testutilities.eraseTemporaryFile( temp ) self.assertFalse( result ) # XML special distinctions ---------------------------------------------------- def testFoundFhir( self ): testutilities.printTestCaseName( sys._getframe( ).f_code.co_name ) temp = testutilities.createTemporaryFile( '.xml', FHIR_XML ) result = ftvalidate.validateFile( 'fhir', temp ) testutilities.eraseTemporaryFile( temp ) self.assertTrue( result ) def testFoundCcd( self ): testutilities.printTestCaseName( sys._getframe( ).f_code.co_name ) temp = testutilities.createTemporaryFile( '.xml', CCD_XML ) result = ftvalidate.validateFile( 'ccd', temp ) testutilities.eraseTemporaryFile( temp ) self.assertTrue( result ) def testFoundCda( self ): testutilities.printTestCaseName( sys._getframe( ).f_code.co_name ) temp = testutilities.createTemporaryFile( '.xml', CDA_XML ) result = ftvalidate.validateFile( 'cda', temp ) testutilities.eraseTemporaryFile( temp ) self.assertTrue( result ) # String content tests -------------------------------------------------------- def testGoodStringAsJson( self ): testutilities.printTestCaseName( sys._getframe( ).f_code.co_name ) result = ftvalidate.validateString( 'json', GOOD_JSON ) self.assertTrue( result ) def testBadStringAsJson( self ): testutilities.printTestCaseName( sys._getframe( ).f_code.co_name ) result = ftvalidate.validateString( 'json', BAD_JSON ) self.assertFalse( result ) def testGoodStringAsXml( self ): testutilities.printTestCaseName( sys._getframe( ).f_code.co_name ) result = ftvalidate.validateString( 'xml', GOOD_XML ) self.assertTrue( result ) def testBadStringAsXml( self ): testutilities.printTestCaseName( sys._getframe( ).f_code.co_name ) result = ftvalidate.validateString( 'xml', BAD_XML ) self.assertFalse( result ) if __name__ == '__main__': unittest.main()
""" This is one of a greater collections of format validators that are callable to enforce or inform as to whether a file or (sometimes) string is in correct JSON, XML, PDF, XML/CDA, XML/CCD, XML/CCDA, XML/FHIR, etc.) Entry into this is possible via main() for testing and incidental or command-line use, and also an entry point callable as a web end-point. :author Russell Bateman :since July 2016 """ import os import sys import json from copy import copy from subprocess import Popen, PIPE import xml.etree.ElementTree as ElementTree def main( argv ): argcount = len( argv ) if '--help' or '-h' in argv: print( '%s <filetype> filepath' % argv[ 0 ] ) return -1 if argcount < 3: print( 'Not enough arguments' ) return -1 fileType = argv.pop( 1 ) # (remove and return file type) if argcount == 1 and os.path.exists( argv[ 1 ] ): return 0 if validateFile( fileType, argv[ 1 ] ) else -1 return -1 TYPES = { 'json' : 1, 'xml' : 2, 'pdf' : 3, 'cda' : 4, 'ccd' : 5, 'fhir' : 6, 'JSON' : 1, 'XML' : 2, 'PDF' : 3, "CDA" : 4, "CCD" : 5, "FHIR" : 6 } CORRECT_TYPE = { 0 : 'BAD', 1 : 'json', 2 : 'xml', 3 : 'pdf', 4 : 'cda', 5 : 'ccd', 6 : 'fhir' } GOOD_RESULT = 'application/%s' def validateFile( fileType=None, filepath=None ): fileType = CORRECT_TYPE.get( TYPES.get( fileType, 0 ), 'BAD' ) if fileType == 'BAD': return False elif fileType == 'json': try: with open( filepath, 'r' ) as f: json.load( f ) return True except( IOError, ValueError ): return False else: mimeType = getLinuxFileMimeType( filepath ) if "ERROR" in mimeType: # (likely filepath is bad or no privileges) return False result = GOOD_RESULT % ( 'pdf' if fileType == 'pdf' else 'xml' ) result = result in mimeType.strip() if not result: return False if not fileType in [ 'fhir', 'ccd', 'cda' ]: return True return validateObliqueXml( fileType, filepath ) def getLinuxFileMimeType( filepath ): """ Magic Linux command that says what it thinks this file is. """ linuxFileCommand = '/usr/bin/file -b --mime %s' % filepath proc = Popen( linuxFileCommand, shell=True, stdout=PIPE ) (mimeType, error) = proc.communicate( ) return mimeType XML_MARKERS = { 'fhir' : 'xmlns="http://hl7.org/fhir"', 'cda' : 'xmlns="urn:hl7-org:v3"', 'ccd' : 'xmlns="urn:hl7-org:v3"' } CCD_MARKERS = [ '<typeId', 'extension="POCD_' ] def validateObliqueXml( fileType, filepath ): """ This file is a candidate for being FHIR, CCD (CCDA) or CDA. """ fp = open( filepath ) f = mmap.mmap( fp.fileno(), 0, access=mmap.ACCESS_READ ) if fileType == 'fhir': return f.find( XML_MARKERS[ 'fhir' ] ) > 0 if fileType in [ 'ccd', 'cda' ]: ok = f.find( XML_MARKERS[ fileType ] ) if not ok: return False if fileType == 'cda': # enough to be a valid CDA... return True # to return True, this must be a valid CCD (including CCDA) meaning both markers here... if f.find( CCD_MARKERS[ 0 ] ) < 1: return False if f.find( CCD_MARKERS[ 1 ] ) < 1: return False return True import mmap def validateString( fileType=None, string=None ): """ Support by-string as a useful option? """ fileType = CORRECT_TYPE.get( TYPES.get( fileType, 0 ), 'BAD' ) if fileType == 'BAD': return False elif fileType == 'json': try: json.loads( string ) return True except ValueError: return False elif fileType == 'xml': try: elementTree = ElementTree.fromstring( string ) loadXmlAsDictionary( elementTree, root=True ) except ElementTree.ParseError: return False # TODO: apply extra tests for FHIR, CDA, CCD, etc? return True elif fileType == 'pdf': return False # (unsupported by design) return False def loadXmlAsDictionary( elementTree, root=True ): if root: return { elementTree.tag : loadXmlAsDictionary( elementTree, False ) } dictionary = copy( elementTree.attrib ) if elementTree.text: dictionary[ "_text" ] = elementTree.text for x in elementTree.findall( "./*" ): if x.tag not in dictionary: dictionary[ x.tag ] =[] dictionary[ x.tag ].append( loadXmlAsDictionary( x,False ) ) return dictionary if __name__ == "__main__": if len( sys.argv ) <= 1: sys.exit( main( [ '--help' ] ) ) elif len( sys.argv ) >= 1: sys.exit( main( sys.argv ) ) # vim: set tabstop=4 shiftwidth=4 expandtab:
Handy if a bit off-the-cuff test utilities I threw in...
import os import sys import tempfile PRINT = False def spinCommandLine( scriptName=None, commandLine=None ): ''' This will turn test with spaces into a command line as if sys.argv. ''' sys_argv = [] if not commandLine: return sys_argv sys_argv.append( scriptName ) sys_argv.extend( commandLine.split( ' ' ) ) return sys_argv def createTemporaryFile( extension=None, contents=None ): if extension: (fd, path) = tempfile.mkstemp( suffix=extension ) else: (fd, path) = tempfile.mkstemp( suffix='.tmp' ) if contents: os.write( fd, contents ) os.close( fd ) return path def readTemporaryFileAsString( path=None ): if not path: return '' string = '' try: with open( path, 'r' ) as f: for line in f: string = string + line except Exception as e: print( 'I/O operation failed on temporary file: %s' % e ) def eraseTemporaryFile( path=None ): if not path: return os.remove( path ) def turnOnPrinting( enable=False ): PRINT = enable def printOrNot( thing='' ): if not PRINT: return print thing CONSOLE_WIDTH = 80 def printTestCaseName( functionName ): ''' Call thus: printTestCaseName( sys._getframe().f_code.co_name ) --helps you sort through unit test output by creating a banner. ''' if not PRINT: return banner = '\nRunning test case %s ' % functionName length = len( banner ) sys.stdout.write( banner ) for col in range( length, CONSOLE_WIDTH ): sys.stdout.write( '-' ) sys.stdout.write( '\n' ) sys.stdout.flush()
Python data structure-to-XML serialization serializes:
{ 'documents': [ { 'formats': [ { 'info': { 'uri': 'http://www.python.org/newness-of-python.pdf', 'pages': '245' }, 'type': 'pdf' }, { 'info': { 'uri': 'http://www.python.org/newness-of-python.html' }, 'type': 'web' } ], 'copyright': { 'url': 'http://www.creativecommons.org/', 'date': 'June 24, 2009', 'type': 'CC' }, 'title': 'The Newness of Python', 'date': 'June 6, 2009', 'text': [ 'Python is very nice. Very, very nice.' ], 'author': 'John Doe' } ] }
...to
<documents> <document date="June 6, 2009" author="John Doe" title="The Newness of Python"> <copyright url="http://www.creativecommons.org/" date="June 24, 2009" type="CC" /> <text> Python is very nice. Very, very nice. </text> <formats> <format type="pdf"> <info uri="http://www.python.org/newness-of-python.pdf" pages="245" /> </format> <format type="web"> <info uri="http://www.python.org/newness-of-python.html" /> </format> </formats> </document> </documents>
While this is very nice, I struggle to conceive of a stable Java class that could represent this to such a point as for XStream to deserialize it. (This is germane because I was trying to think of a way to serialize a Python object to XML, deliver it to a servlet written in Java, then deserializing it to and consuming it from a POJO.)
Here's the code (py2xml.py):
''' Py2XML - Python to XML serialization This code transforms a Python data structures into an XML document __author__ = "David McCuskey" __since__ = "16 June 2010" Usage: serializer = Py2XML() xml_string = serializer.parse( python_object ) print python_object print xml_string ''' class Py2XML(): def __init__( self ): self.data = "" # where we store the processed XML string def parse( self, pythonObj, objName=None ): ''' processes Python data structure into XML string needs objName if pythonObj is a List ''' if pythonObj == None: return "" if isinstance( pythonObj, dict ): self.data = self._PyDict2XML( pythonObj ) elif isinstance( pythonObj, list ): # we need name for List object self.data = self._PyList2XML( pythonObj, objName ) else: self.data = "<%(n)s>%(o)s</%(n)s>" % { 'n':objName, 'o':str( pythonObj ) } return self.data def _PyDict2XML( self, pyDictObj, objName=None ): ''' process Python Dict objects They can store XML attributes and/or children ''' tagStr = "" # XML string for this level attributes = {} # attribute key/value pairs attrStr = "" # attribute string of this level childStr = "" # XML string of this level's children for k, v in pyDictObj.items(): if isinstance( v, dict ): # child tags, with attributes childStr += self._PyDict2XML( v, k ) elif isinstance( v, list ): # child tags, list of children childStr += self._PyList2XML( v, k ) else: # tag could have many attributes, let's save until later attributes.update( { k:v } ) if objName == None: return childStr # create XML string for attributes for k, v in attributes.items(): attrStr += " %s=\"%s\"" % ( k, v ) # let's assemble our tag string if childStr == "": tagStr += "<%(n)s%(a)s />" % { 'n':objName, 'a':attrStr } else: tagStr += "<%(n)s%(a)s>%(c)s</%(n)s>" % { 'n':objName, 'a':attrStr, 'c':childStr } return tagStr def _PyList2XML( self, pyListObj, objName=None ): ''' process Python List objects They have no attributes, just children Lists only hold Dicts or Strings ''' tagStr = "" # XML string for this level childStr = "" # XML string of children for childObj in pyListObj: if isinstance( childObj, dict ): # here's some Magic # we're assuming that List parent has a plural name of child: # eg, persons > person, so cut off last char # name-wise, only really works for one level, however # in practice, this is probably ok childStr += self._PyDict2XML( childObj, objName[:-1] ) else: for string in childObj: childStr += string; if objName == None: return childStr tagStr += "<%(n)s>%(c)s</%(n)s>" % { 'n':objName, 'c':childStr } return tagStr def main(): python_object =\ { 'documents': [ { 'formats': [ { 'info': { 'uri': 'http://www.python.org/newness-of-python.pdf', 'pages': '245' }, 'type': 'pdf' }, { 'info': { 'uri': 'http://www.python.org/newness-of-python.html' }, 'type': 'web' } ], 'copyright': { 'url': 'http://www.creativecommons.org/', 'date': 'June 24, 2009', 'type': 'CC' }, 'title': 'The Newness of Python', 'date': 'June 6, 2009', 'text': [ 'Python is very nice. Very, very nice.' ], 'author': 'John Doe' } ] } serializer = Py2XML() xml_string = serializer.parse( python_object ) print python_object print xml_string if __name__ == '__main__': main()