Monthly Archives: October 2015

Speeding up an ArcPy script using the Python multiprocessing module

ArcPy is ESRI’s interface to scripted geoprocessing capabilities. While I personally am much more inclined to use the open-source tool stack of OGR/GDAL, I am stuck with ESRI, ArcGIS and ArcPy at my current professional position. One of my more recent projects presented the need of analyzing a number of point features with regard to nearby line features. In detail the line feature with the smallest distance to the point feature had to be retrieved.

A corresponding algorithm could look like this:

  1. Select a point feature.
  2. Create a buffer around the feature using a base size.
  3. Collect all line features that are situated within the selected buffer.
  4. If no line features were found continually increase buffer size and repeat step 3 until one or more line features have been found.
  5. From the collected line features identify the one with the smallest distance to the selected point feature.

The implementation of this process is straightforward.

#!/usr/bin/env python
import os, sys
import arcpy

sde_connection_file = "sde_connection.sde"

point_src_dataset = "FDS_POINT"
point_src_featureclass = "FCL_POINT"

line_src_dataset = "FDS_LINE"
line_src_featureclass = "FCL_LINE"

buffer_base_increment = 10

point_lyr_src = '\\'.join((sde_connection_file, point_src_dataset, point_src_featureclass))
line_lyr_src = '\\'.join((sde_connection_file, line_src_dataset, line_src_featureclass))


def find_nearest_line(point_lyr_src, line_lyr_src):
    # creating ESRI feature layers
    point_lyr = arcpy.management.MakeFeatureLayer(point_lyr_src, "point_layer")
    line_lyr = arcpy.management.MakeFeatureLayer(line_lyr_src, "line_layer")

    # retrieving OID and shape field names for point feature layer
    oid_fieldname = arcpy.Describe(point_lyr).OIDFieldName
    shape_fieldname = arcpy.Describe(point_lyr).shapeFieldName

    # setting up container for resulting pairs of points and least-distance lines
    pnt_min_dist_line_pairs = list()
    
    with arcpy.da.SearchCursor(point_lyr, ["OID@", "SHAPE@"]) as point_cursor:
        for poid, geom in point_cursor:
            print "Working on point feature %d" % poid
            # setting up a where clause to create a feature layer that solely contains the current point of interest
            poi_where_clause = "%s = %d" % (oid_fieldname, poid)
            # crreating that feature layer that will be used as foundation for distance calculation
            poi_lyr = arcpy.management.MakeFeatureLayer(point_lyr, "poi_layer", poi_where_clause)
    
            buffer_radius = 0
            nearby_lines_cnt = 0
            i = 1
            
            while not nearby_lines_cnt:
                # setting initial buffer radius to base size or incrementing
                # current buffer radius by base size (non-linear)
                buffer_radius += i * buffer_base_increment
                # finding all line elements that are within a distance of
                # buffer radius from the point of interest
                nearby_lines_lyr = arcpy.management.SelectLayerByLocation(line_lyr, "WITHIN_A_DISTANCE", poi_lyr, buffer_radius)
                nearby_lines_cnt = int(arcpy.management.GetCount(nearby_lines_lyr).getOutput(0))
                i += 1

            print "Number of line features found within %d m of point feature %d: %d" % (buffer_radius, poid, nearby_lines_cnt)
            
            # retrieving point geometry
            point_geom = arcpy.PointGeometry(geom.getPart())
    
            # setting up minimal distance variables
            minimal_distance = sys.maxsize
            minimal_distance_line_oid = None
    
            # from all found lines within buffer radius find the one with the
            # smallest distance to the point of interest feature
            with arcpy.da.SearchCursor(nearby_lines_lyr, ["OID@", "SHAPE@"]) as line_cursor:
                for loid, lgeom in line_cursor:
                    line_geom = arcpy.Polyline(lgeom.getPart())
                    distance = point_geom.distanceTo(line_geom)
    
                    if distance < minimal_distance:
                        minimal_distance = distance
                        minimal_distance_line_oid = loid

            print "Minimum distance calculated between point feature %d and line feature %d: %0.2f" % (poid, minimal_distance_line_oid, minimal_distance)
            
            arcpy.management.Delete(poi_lyr)

            pnt_min_dist_line_pairs.append((poid, minimal_distance_line_oid, minimal_distance))
                        
            print "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++"

    return pnt_min_dist_line_pairs

if __name__ == '__main__':
    
    point_with_least_distance_lines = find_nearest_line(point_lyr_src, line_lyr_src)

Problem with this solution is that depending on the number of point objects to analyze it can be painfully inefficient as we linearly process one item at a time. This may not be problematic if the overall count of items is concise, but real-world data tends to be not to. In my case several thousands of objects had to be regarded leading to a processing time of multiple hours. Whilst optimization is something that should not be done prematurely, I regarded this situation definitely as worth looking into in more detail.

A very common approach to optimize a software solution is to split the complete set of tasks into smaller subsets and to process several of those subsets simultaneously. This approach has become particularly viable with the appearance of multi-core hardware that seems predestined to be used in this manner. From a conceptional viewpoint it is necessary that the individual problems do not interfere with each other. This is clearly the case in the described setup as the minimum distance between a point and a number of line feature is obviously independent from another point feature.

Regarding the point features at hand we will create subsets by splitting the complete dataset into parts that are not larger than a certain cutoff size. An original question over here presents a solution that is already feasible for my demands:

def chunks(l, n):
    u"""
    Yields successive n-sized chunks from l with the last chunk containing
    m (< n) elements.
    """
    for i in xrange(0, len(l), n):
        yield l[i : i + n]

This generic function may now be applied to the actual set of input features:

def get_equal_parts(lyr, size):
    u"""
    Creates oid ranges with given maximum size from specified layer.
    """
    print "Creating oid ranges with maximum size %d" % size
    
    all_oids = list()
    
    with arcpy.da.SearchCursor(lyr, ["OID@"]) as cursor:
        for oid, in cursor:
            all_oids.append(oid)
    
    print "%d oids found overall" % len(all_oids)
    equal_parts = list(chunks(sorted(all_oids), size))
    print "%d oid ranges created" % len(equal_parts)
    return equal_parts

if __name__ == '__main__':
    equal_parts = get_equal_parts(point_lyr_src, 20)    

Please note that is not sufficient to simply retrieve minimum and maximum object ids from the presented layer as previous edits or selections may have created gaps between those values. We actually have to collect all available OIDs instead.

Now for Python, the multiprocessing module (among others) comprises several options to implement parallel solutions. One is to setup a pool of worker processes that subsequently are equipped with according tasks. Here we will supply a number of workers the task to retrieve nearest lines to one of the previously created range of point objects. To do so we have to collect all the necessary data first, i.e. sources for point and line layers as well as the range of OIDs:

if __name__ == '__main__':
    mp_func_data = list()
    i = 0
    for eq in equal_parts:
        i += 1
        print "part %2d - lower oid bound: %4d - upper oid bound: %4d - size: %2d" % (i, eq[0], eq[-1], len(eq))
        mp_func_data.append([point_lyr_src, line_lyr_src, eq[0], eq[-1]])

As you can see we have combined all necessary input data for one chunk into a list object before appending it to an overall list containing the input data for all chunks. This is by design as mapping worker processes to functions only allows one argument. That is also we have to adjust the definition of our worker function:

def find_nearest_line(func_data):
    # unpacking provided function data
    point_lyr_src, line_lyr_src, oid_lower_bound, oid_upper_bound = func_data

    # creating ESRI feature layers
    point_lyr = arcpy.management.MakeFeatureLayer(point_lyr_src, "point_layer")
    line_lyr = arcpy.management.MakeFeatureLayer(line_lyr_src, "line_layer")

    # retrieving OID and shape field names for point feature layer
    oid_fieldname = arcpy.Describe(point_lyr).OIDFieldName
    shape_fieldname = arcpy.Describe(point_lyr).shapeFieldName

    # setting up where clause to restrict selection to specified oid range
    where_clause = "%s >= %d AND %s <= %d" % (oid_fieldname, oid_lower_bound, oid_fieldname, oid_upper_bound)
    # re-create point feature layer
    arcpy.management.Delete(point_lyr)
    point_lyr = arcpy.management.MakeFeatureLayer(point_lyr_src, "point_layer", where_clause)

Finally we create a worker pool and assign each worker a task by mapping it to the function for nearest line retrieval and providing it with the necessary input data:

    # setting up pool of four workers
    pool = multiprocessing.Pool(4, None, None, 1)
    # setting up list for all results
    all_results = list()

    # mapping function for nearest line retrieval (with input data) to worker pool
    for worker_results in pool.map(find_nearest_line, mp_func_data):
        all_results.extend(worker_results)

    # closing pool and waiting for each task to be finished
    pool.close()
    pool.join()

The Pool constructor allows for several arguments. One may define an initialization function that each worker will carry out after being created, this function may of course be provided with initialization arguments. Both of these arguments are set to None in the case at hand. More interesting is the last argument which is set to 1. This is the maximum number of tasks that will be processed by a single worker process before it is killed and a new one is spawned. As ArcPy functions are notorious for leaking memory it is especially necessary to keep this number low or we may end up with exceptions due to low memory.

Implementing multiprocessing for my use case has decreased processing time to a quarter of the previous duration. Now it is still in the range of hours but at least considerable less than in the linear version of this program.

We will have a look at how analysis tasks like the one described above may be sped up by applying low-level database queries in a later post on the Portolan Blog.