Friday, February 17, 2017

Sharing a Python generator across multiple multiprocessing processes.

Sometimes I need to do some work on a seemingly endless set of data.  I often use generators to create the endless data.  Say, for example I were trying to brute-force crack a zip file's password.  I'd create a generator that methodically and continually creates new passwords for the cracking to to use in attempting to unzip the file.

Once I have my generator in place, I have a problem.  What if I want to spread this brute-force attack across all the cores in my system to speed up this slow endless effort.  One would think that multiprocessing will solve the problem.

Well, no.  Generators aren't natively shared across processes in Python.  They're copied to each process.  This means that if you try to use the generator with multiprocessing, each process will get it's own copy of the generator and each process would get the same values from each copy.

To solve that, I devised a quick and dirty solution:  Place the generator in it's own process and then have each worker process request the next value from it via inter-process communication using the multi-processing Pipe class.

To that end, here's an example.  I post this here mainly to jog my memory next time I need to do this, but if you find it useful, great!


Wednesday, February 8, 2017

Non-freeze, single file distribution of your python project

In my job, I often have to share/distribute Python code to others on my development team or to others in sibling development teams.  These folks are technical and I can easily rely on them to have a reasonably contemporary version of Python already installed.  Thus I don't really want to completely freeze my Python project (via cxFreeze, py2app, py2exe, etc.).  Nor do I want to send my coworkers through the 'create a virtualenv, pip install -r, etc.).

I want a middle-ground.  I want to just send them a file, knowing that it contains all the required dependencies and will just run, assuming they have Python in their path.

I've known for a while that if you zip up a directory full of Python files and so long as the root of the zip file contains a __main__.py file and you can directly run the zip file via python {filename.zip}.

What I didn't know is that you can concatenate a text file containing the Python shebang ('#!/usr/bin/env python') and a python zip file and the resulting file is runnable.

#!/usr/bin/env python
PK^C^D^T^@^B^@^H^@nLHJb±gL<94>^A^@^@Ò^B^@^@^K^@^\^@__main__.pyUT    ^@^C¯,<9b>Xö,<9b>Xux^K^@^A^D«<82><90>~^D^T^@^@^@u<92>ÑkÛ0^PÆßýWܲ^GÙÐ$},^EÃÂ(<85><90>ìq ^Tûâ<88>Z<92>9<9d>º<95>±ÿ}';ÎJ`Â^O<96>ôéû~ºÓ
ÈY©1^F<93>9£4@^Q&<81><8e>O^X<88>2#tCrÌ^N0©¸<83>ÛÑw°ÌÕÂ#®Ë¾^A^PÅ >«¼h@^P;c6<8d>"^Âý=^\AB!ej<91>  <9e>g¦ñ<95>Vgé<87>ÌqÏ^RC<8a>9<99>aq^O^O<9f>Cv^U^AþXQi<88>ÛÌ®°Üx<90>µó<98>^Gl­ ´Z^B<81>^E·¾&^B^C<83>ÕÚ§
,g^LÃ=kW^Y$r­â Æ]HÉ^UÝ5AÂ[V^FC$y<80>¢a<82>y^V^U{°(Ô<9b>^XW}¤h<8f>aØ}¦^G>o<96>êu<9c>e?k<8e>,àbºÔ/ýëh´¶^LÇp^A^Ye<8b>LÕBt<85>w£hi¿&(^HÛîä<96>tÿ.^@A>
dJuëX

I stumbled upon a tool called pex which will automate generating these shebang/zip files.  While I found pex useful, it seems too much for my needs.  So I started cobbling together my own shebang/zip files by hand as I needed them.

Ultimately, wanted to automate this some, so I ended up creating a simple bash script (which I named 'compile.sh') to speed things up.  Here's that script.  Hope you find this useful.

#!/bin/bash
#
# compile.sh

# Creates a self-runnable, single-file deployable for your python project.
# Your python project's main entry point must be in a file named __main__.py.
# It will embed in the deployable every dependency you've pip-installed in
# your virtualenv .env directory.
#
# The deployable still requires that python is installed on the target system.

if [[ ! -d .env/lib/python2.7/site-packages ]]; then
    echo "This script must be run from a virtualenv root."
    exit 1
fi

if [[ ! -f __main__.py ]]; then
    echo "This will only work if you have a __main__.py file."
    exit 1
fi

if [[ -z "$1" ]]; then
    echo "Usage: $(basename $0) {output_filename}"
    exit 1
fi

TMPDIR="$(mktemp -u /tmp/python.compile.XXXXXXX)"
CURDIR="$(pwd)"
TARGET="$1"

# Create payload zip with our code (remove extra junk)
mkdir -p "$TMPDIR/deps"
zip -9 "$TMPDIR/payload.zip" *
zip -d "$TMPDIR/payload.zip" compile.sh
zip -d "$TMPDIR/payload.zip" requirements.txt
zip -d "$TMPDIR/payload.zip" "$TARGET" > /dev/null

# Gather the virtualenv packages and clean them up
cp -R .env/lib/python2.7/site-packages/* "$TMPDIR/deps"
find "$TMPDIR/deps" -iname '*.pyc' -delete
find "$TMPDIR/deps" -ipath '*pip*' -delete
find "$TMPDIR/deps" -ipath '*easy_install*' -delete
find "$TMPDIR/deps" -ipath '*dist-info*' -delete

# Add the virtualenv packages to the payload
cd "$TMPDIR/deps"
zip -9 -r ../payload.zip *
cd "$CURDIR"

# Assemble the payload into a runable
echo '#!/usr/bin/env python' | cat - "$TMPDIR/payload.zip" > "$TARGET"
chmod +x "$TARGET"

# Cleanup
rm -r "$TMPDIR"

Wednesday, October 7, 2015

Python Decorators Examples

I often go long stretches between times when I aggressively use python at clients.  As a result, I get rusty on what I could consider somewhat advanced topics and have to spend time googling around to jog my memory.  Reading my own code usually gets me back up and running quicker than reading others' code.  Leaving this here as a breadcrumb to help me remember how to use python decorators.

If you find this helpful, awesome!

#!/usr/bin/env python
"""Decorator examples"""

def decorator_without_arguments(func):
    def wrapped_function(*args, **kwargs):
        """Reverses the arguments before calling"""
        new_args = reversed(args)
        return func(*new_args, **kwargs)
    wrapped_function.__name__ = "(%s -> %s)" % \
        (decorator_without_arguments.__name__, func.__name__)
    return wrapped_function

def decorator_with_arguments(arg1, arg2=None):
    def actual_decorator(func):
        def wrapped_function(*args, **kwargs):
            """Wraps results in explanation string"""
            result = "(%s: %s) " % (arg1, ', '.join([str(x) for x in args]))
            result += str(func(*args, **kwargs))
            if arg2 is not None:
                result += arg2
            return result
        wrapped_function.__name__ = func.__name__
        return wrapped_function
    return actual_decorator


def undecorated_subtract(number_1, number_2):
    return number_1 - number_2

@decorator_without_arguments
def decorated_subtract(number_1, number_2):
    return number_1 - number_2

def undecorated_add(number_1, number_2):
    return number_1 + number_2

@decorator_with_arguments('ADD')
def decorated_add(number_1, number_2):
    return number_1 + number_2


if __name__ == '__main__':
    print undecorated_subtract(7, 3)
    print decorated_subtract(7, 3)
    print decorated_subtract
    print undecorated_add(3, 7)
    print decorated_add(3, 7)
    print decorated_add

Wednesday, July 23, 2014

Embedding python in bash scripts

As a software development consultant, I do a lot of bash scripting.  I can do a lot of really creative things using nothing but bash and the binutils at my disposal, but sometimes I'll come across something that's just easier to do in a higher level scripting language.  Enter python.

Conversely, there are a lot of things that are just easier and more straight forward to do in bash, so writing everything in straight python may be more work than it's worth.

Here's a quick posting to describe how you can embed some python code into your bash scripts and get the best of both worlds.  Note: just as a heads, up, the examples in this posting are quite contrived.

Calling python from bash is easy.  You simply use python's '-' argument and pipe in your python code.  I typically wrap my python code in a bash function.

#!/bin/bash

function current_datetime {
python - <<END
import datetime
print datetime.datetime.now()
END
}

# Call it
current_datetime

# Call it and capture the output
DT=$(current_datetime)
echo Current date and time: $DT

You can also pass data into your embedded python script.  I do that using environment variables:

#!/bin/bash

function line {
PYTHON_ARG="$1" python - <<END
import os
line_len = int(os.environ['PYTHON_ARG'])
print '-' * line_len
END
}

# Do it one way
line 80

echo 'Handy'

# Do it another way
echo $(line 80)

My usual use-case for doing this is if I'm extending someone else's bash scripts and have to 'go off the reservation' a bit.  Sometimes I'm updating an existing 'legacy' script and need to look up some data... maybe do a REST call or something.  Here's an example bash script that uses curl to call a REST service to get some weather data.  Then is passes the raw JSON response to an embedded python script to interpret and format the results:

#!/bin/bash

function format_weather_data() {
PYTHON_ARG="$1" python - <<END
import os
import json

json_data = os.environ['PYTHON_ARG']
data =json.loads(json_data)
lookup = {
    '200': 'thunderstorm with light rain',
    '201': 'thunderstorm with rain',
    '202': 'thunderstorm with heavy rain',
    '210': 'light thunderstorm',
    '211': 'thunderstorm',
    '212': 'heavy thunderstorm',
    '221': 'ragged thunderstorm',
    '230': 'thunderstorm with light drizzle',
    '231': 'thunderstorm with drizzle',
    '232': 'thunderstorm with heavy drizzle',
    '300': 'light intensity drizzle',
    '301': 'drizzle',
    '302': 'heavy intensity drizzle',
    '310': 'light intensity drizzle rain',
    '311': 'drizzle rain',
    '312': 'heavy intensity drizzle rain',
    '313': 'shower rain and drizzle',
    '314': 'heavy shower rain and drizzle',
    '321': 'shower drizzle',
    '500': 'light rain',
    '501': 'moderate rain',
    '502': 'heavy intensity rain',
    '503': 'very heavy rain',
    '504': 'extreme rain',
    '511': 'freezing rain',
    '520': 'light intensity shower rain',
    '521': 'shower rain',
    '522': 'heavy intensity shower rain',
    '531': 'ragged shower rain',
    '600': 'light snow',
    '601': 'snow',
    '602': 'heavy snow',
    '611': 'sleet',
    '612': 'shower sleet',
    '615': 'light rain and snow',
    '616': 'rain and snow',
    '620': 'light shower snow',
    '621': 'shower snow',
    '622': 'heavy shower snow',
    '701': 'mist',
    '711': 'smoke',
    '721': 'haze',
    '731': 'sand, dust whirls',
    '741': 'fog',
    '751': 'sand',
    '761': 'dust',
    '762': 'volcanic ash',
    '771': 'squalls',
    '781': 'tornado',
    '800': 'clear sky',
    '801': 'few clouds',
    '802': 'scattered clouds',
    '803': 'broken clouds',
    '804': 'overcast clouds',
    '900': 'tornado',
    '901': 'tropical storm',
    '902': 'hurricane', 
    '903': 'cold',
    '904': 'hot',
    '905': 'windy',
    '906': 'hail',
    '950': 'setting',
    '951': 'calm',
    '952': 'light breeze',
    '953': 'gentle breeze',
    '954': 'moderate breeze',
    '955': 'fresh breeze',
    '956': 'strong breeze',
    '957': 'high wind, near gale',
    '958': 'gale',
    '959': 'severe gale',
    '960': 'storm',
    '961': 'violent storm',
    '962': 'hurricane',
}

print "Current temperature: %g F" % data['main']['temp']
print "Today's high: %g F" % data['main']['temp_max']
print "Today's low: %g F" % data['main']['temp_min']
print "Wind speed: %g mi/hr" % data['wind']['speed']
weather_descs = [lookup.get(str(i['id']), '*error*') for i in data['weather']]
print "Weather: %s" % ', '.join(weather_descs)

END
}

WEATHER_URL="http://api.openweathermap.org/data/2.5/weather?q=Cincinnati,OH&units=imperial"

format_weather_data "$(curl -s $WEATHER_URL)"

Hope you find this information useful. 

Wednesday, January 22, 2014

Formatting GUIDs with sed

At my current client, I'm often given GUIDs for one reason or another.  Often they are not in the format that I need them to be.  Sometimes they have the dashes and I need them to be dash-free.  Other times, the dashes have been removed and I need them back.

Here's some quick, down and dirty sed commands to swap them back and forth:

To remove the dashes:

$ echo 8EC60070-685F-41DB-C881-EACF9E74E4BD | sed 's/-//g'
8EC60070685F41DBC881EACF9E74E4BD
$

To put them back:

$ echo 8EC60070685F41DBC881EACF9E74E4BD | sed -rn 's/([0-9A-F]{8})([0-9A-F]{4})([0-9A-F]{4})([0-9A-F]{4})([0-9A-F]{12})/\1-\2-\3-\4-\5/p')
8EC60070-685F-41DB-C881-EACF9E74E4BD
$

It goes without saying you can embed this very sed command in a bash script and run it over many rows in a file.

Thursday, March 14, 2013

Monitoring a web page for changes using bash

There's this conference that I'd like to attend and I've heard that it's a hard-to-get-into type conference.  When I go to their site it doesn't have any new info.

Rather than checking the site every day, I'd like to have it monitored and be alerted when something new DOES appear on it.

Now I know there are services like ChangeDetection.com that can monitor it for me, but I was wanting to cobble something together with the tools I already have.  I'd also like to have the ability to customize what it consider "a change" at my disposal when/if I need it.

To that end, I threw together the following bash script.  It monitors a URL and if it detects a change, it sends an email to my gmail account letting me know.

Hope you find it useful.  BTW, I'm using a program called sendEmail to send the email notification.  It's in apt if you're using a debian/ubuntu-like distribution.

#!/bin/bash

# monitor.sh - Monitors a web page for changes
# sends an email notification if the file change

USERNAME="me@gmail.com"
PASSWORD="itzasecret"
URL="http://thepage.com/that/I/want/to/monitor"

for (( ; ; )); do
    mv new.html old.html 2> /dev/null
    curl $URL -L --compressed -s > new.html
    DIFF_OUTPUT="$(diff new.html old.html)"
    if [ "0" != "${#DIFF_OUTPUT}" ]; then
        sendEmail -f $USERNAME -s smtp.gmail.com:587 \
            -xu $USERNAME -xp $PASSWORD -t $USERNAME \
            -o tls=yes -u "Web page changed" \
            -m "Visit it at $URL"
        sleep 10
    fi
done

Then from a bash prompt I run it with the following command:

nohup ./monitor.sh &

Using nohup and throwing it in the background allows me to log out and have the script continue to run.

Monday, February 4, 2013

Fun with Python's multiprocessing module

At my current client, they asked me to write a Python script to aggregate data from a RESTful web service.  Essentially, they have a web service end-point that takes a single customer ID as an argument and it returns some customer profile records as JSON.

They wanted my script to call the web service for each and every customer listed in a text file.  The script will be run from a scheduled job (cron, windows scheduled task, etc.).  Once productionalized, the text file will contain a large number of customer ID's (like a million or so).  All the results need to be stored in a single text file for the run.

My first thought was to use Python's threading features, but I came with 2 problems with this:

  • Python's threading isn't "real" threading.  A single Python process will run all threads on a single physical core regardless of how many cores the server has.  The other cores will not be utilized.
  • Python's urllib and urllib2 packages aren't even thread safe.
Now I know there's a third-party thread-safe urllib3 package, but that doesn't solve the multi-core issue and I wasn't looking to install anything beyond Python 2.7's standard library on the server.  After doing a little research, I came up with the idea of using Python's multiprocessing module.

The rest of this blog posting is a walkthrough of what I ended up building.

First things first - configuration

Since this is a script that will be run often and certain configuration settings will need to be updated from time to time by support staff, I decided to extract the configuration out to a separate configuration file.  Initially, I had the settings stored in a .ini file, but I later thought why not use Python for settings.  To that end, I have a settings.py file that holds a class that contains the settings:

#!/usr/bin/env python
"""
Settings file.  See job.py for more info.
"""

# -----------------------------------------------------------
# Edit the values in this Settings class to control settings.
# -----------------------------------------------------------

class Settings:
    """Settings as a class.  Why?  Because I'm lazy."""

    source_file = 'TestData.csv'

    profile_data_file = 'results.txt'

    log_file = 'log.txt'

    failed_customer_file = 'failed.txt'

    process_count = 10

    url = 'http://division-1.internal-server.com/rest-service/endpoint/{0}'

There is one input file (source_file).  This is the file that contains the list of customer ID's.  Then there are 3 output files:  profile_data_file, log_file and failed_customer_file.  The profile_data_file file is where the script will put the customer detail records that it receives from the web service.  The log_file is where the script will write logging messages for debugging problems after the fact.  Finally, failed_customer_file is where the script will write customer ID's of customers that the script fails to retrieve from the web services.

process_count is a setting that will allow you to specify how many parallel processes the script should spawn.

url is the url of the web service end-point.

The main job script is called job.py.  It imports the settings.py module and then creates a new class that inherits from the Settings class.

class Config(settings.Settings):
    """Configuration class"""

    _runtime = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
   
    @staticmethod
    def timestamp(filename):
        """Stamps a filename with the timestamp for this run"""
        parts = os.path.splitext(filename)
        return "%s-%s%s" % (parts[0], Config._runtime, parts[1])

This class has a static method called timestamp.  In the main job, I will call the timestamp method to add the date/time of the run to the output filenames.  So you might see something like:

outfile = open(Config.timestamp(Config.log_file), 'w')

This will open a file named 'log-20120204-112530.txt' for writing.

The Job class

The Job class is the main class that executes the job and spawns the worker processes.  It has the following methods:
  • __init__ - constructor, set's up a multiprocessing Pool and 3 Queue's
  • run - Kicks off the job
  • get_customers - Reads the customer file
  • get_requests - Transforms the customers list into a list of processes requests
  • process_customer_queue - Deals with responses from the web service
  • process_log_queue - Deals with log messages from the worker processes
  • process_exceptions_queue - Deals with exceptions that occur in the worker processes
  • log - Writes log messages to the log file
Here's a copy of the Job class:

class Job:
    """Job that performs the run."""

    def __init__(self):
        self._manager = multiprocessing.Manager()
        self._pool = multiprocessing.Pool(Config.process_count)
        self._customer_queue = self._manager.Queue()
        self._log_queue = self._manager.Queue()
        self._exception_queue = self._manager.Queue()

    def run(self):
        """Do it."""
        start_time = datetime.datetime.now()
        self.log('[Run started]')
        customers = self.get_customers()
        requests = self.get_requests(customers)
        result = self._pool.map_async(get_customer, requests)
        while not result.ready():
            self.process_customer_queue()
            self.process_log_queue()
            self.process_exceptions_queue()
        self._pool.close()
        self._pool.join()
        self.process_customer_queue()
        self.process_log_queue()
        self.process_exceptions_queue()
        self.log('[Run finished]')
        self.log('[Total runtime: %s]' % (datetime.datetime.now() - start_time))

    def get_customers(self):
        """Read the source file."""
        buf = [line.strip() for line in open(Config.source_file).readlines()[1:]]
        return buf

    def get_requests(self, customers):
        """Generate requests from customers."""
        requests = [{ \
            'customer': customer, \
            'data': self._customer_queue, \
            'log': self._log_queue, \
            'exceptions': self._exception_queue, \
            'url': Config.url, \
        } for customer in customers]
        return requests

    def process_customer_queue(self):
        """Pull messages off the data queue."""
        try:
            message = self._customer_queue.get_nowait()
        except Queue.Empty:
            return
        customer = message['customer']
        details = message['server_response']['details']
        outfile = open(Config.timestamp(Config.profile_data_file), 'a')
        for record in details:
            buf = "%s, %s, %s, %s\n" % (customer, record['id'], \
                record['relevanceScore'], record['relevanceRank'])
            outfile.write(buf)
        outfile.close()
            
    def process_log_queue(self):
        """Pull messages off the log queue."""
        try:
            message = self._log_queue.get_nowait()
        except Queue.Empty:
            return
        self.log(message)

    def process_exceptions_queue(self):
        """Pull messages off the exceptions queue."""
        try:
            message = self._exception_queue.get_nowait()
        except Queue.Empty:
            return
        customer = message['customer']
        exception = message['exception']
        self.log("EXCEPTION GETTING %s! - %s" % \
            (customer, str(exception)))
        failed_file = open(Config.timestamp(Config.failed_customer_file), 'a')
        buf = "%s\n" % customer
        failed_file.write(buf)
        failed_file.close()

    def log(self, message):
        """Write message to the log file."""
        logfile = open(Config.timestamp(Config.log_file), 'a')
        timestamp = datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')
        logfile.write("%s - %s\n" % (timestamp, message))
        logfile.close()

Some important things to highlight.  The constructor creates three queues:  customer_queue, log_queue, and exceptions_queue.  These queues are used for interprocess communication.  The constructor creates a Pool of worker processes.  Each process will communicate back to the parent process via these queues.  The customer_queue is responsible for holding the results of the web services calls.  The log_queue is responsible for holding log messages.  The exceptions_queue is responsible for holding exceptions that occur when worker processes have problems talking to the web service.

In the run method, the Job class will call the Pool class' map_async method.  The map_async method splits the requests list into multiple lists and passes each list to a separate worker process for processing. Then the run method goes into a loop, waiting for the worker processes to all complete.  While it's waiting, it will continually check the queues to see if any messages have been received.

The stand-alone function

The map_async method's first argument is the name of the function that each worker process should run the request list items with.  In this case, the name of that function is get_customer.  get_customer looks like this:

def get_customer(request):
    """Retrieve the customer details from the given request."""
    customer = request['customer']
    url = request['url']
    data = request['data']
    log = request['log']
    exceptions = request['exceptions']
    try:
        log.put("Requesting details for customer %s" % customer)
        req_url = url.replace('{0}', customer)
        request = urllib2.Request(req_url)
        response = urllib2.urlopen(request)
        buf = response.read()
        data_records = json.loads(buf)
        coupons.put({'customer: customer, 'server_response': data_records})
        log.put("Successfully retrieved %d details(s) for %s" % \
            (len(data_records), customer))
    except Exception, exc:
        exceptions.put({'customer': customer, 'exception': exc})

This function is called for each request in the worker processes.  It extracts queues from the request.  It uses the put method on each queue to pass information back to the parent process.

Pulling it all together

The last thing in my script file is this:

if __name__ == '__main__':
    # Let's do this!
    Job().run()

It's important to embed the Job().run() call inside the 'if __main__' branching.  This is because of how the multiprocessing internals work.  When the Pool.map_async method is executed, Python will create a number of sub processes.  When each process starts up, it's actually spawning a copy of the Python executable.  When that process comes up, it imports the job module and then starts calling the get_customer function over and over with the requests.  If the Job().run() were not embedded in a 'if __main__' branch, every worker process would spawn more Job classes.  Nothing good would come of that!

Hope you find this helpful.