1.6.6 Arcpy multiprocessing examples
1.6.6 Arcpy multiprocessing examples mrs110Let's look at a couple of examples using ArcGIS functions. There are a number of caveats or gotchas while using multiprocessing with ArcGIS and it is important to cover them up-front because they could result in hundreds of Pro sessions opening and locking your pc, and they effect the ways in which we can write our code.
Esri describe a number of best practices for multiprocessing with arcpy. These include:
- Use the “memory“ (Pro) or "in_memory" (legacy, but still works) workspaces to store temporary results because as noted earlier memory is faster than disk.
- Avoid writing to file geodatabase (FGDB) data types and GRID raster data types. These data formats can often cause schema locking or synchronization issues. That is because file geodatabases and GRID raster types do not support concurrent writing – that is, only one process can write to them at a time. You might have seen a version of this problem in arcpy previously if you tried to modify a feature class in Python that was open in ArcGIS. That problem is magnified if you have an FGDB and you’re trying to write many feature classes to it at once. Even if all of the featureclasses are independent, you can only write them to the FGDB one at a time.
So bearing the two points in mind we should make use of memory workspaces wherever possible and we should avoid writing to FGDBs (in our worker functions at least – but we could use them in our master function to merge a number of shapefiles or even individual FGDBs back into a single source).
1.6.6.1 Multiprocessing with raster data
1.6.6.1 Multiprocessing with raster data jmk649There are two types of operations with rasters that can easily (and productively) be implemented in parallel: operations that are independent components in a workflow, and raster operations which are local, focal, or zonal – that is they work on a small portion of a raster such as a pixel or a group of pixels.
Esri’s Clinton Dow and Neeraj Rajasekar presented way back at the 2017 User Conference demonstrating multiprocessing with arcpy and they had a number of useful graphics in their slides which demonstrate these two categories of raster operations which we have reproduced here as they're still appropriate and relevant.
An example of an independent workflow would be if we calculate the slope, aspect and some other operations on a raster and then produce a weighted sum or other statistics. Each of the operations is independently performed on our raster up until the final operation which relies on each of them (see the first image below). Therefore, the independent operations can be parallelized and sent to a worker and the final task (which could also be done by a worker) aggregates or summarises the result. Which is what we can see in the second image as each of the tasks is assigned to a worker (even though two of the workers are using a common dataset) and then Worker 4 completes the task. You can probably imagine a more complex version of this task where it is scaled up to process many elevation and land-use rasters to perform many slope, aspect and reclassification calculations with the results being combined at the end.
An example of the second type of raster operation is a case where we want to make a mathematical calculation on every pixel in a raster such as squaring or taking the square root. Each pixel in a raster is independent of its neighbors in this operation so we could have multiple workers processing multiple tiles in the raster and the result is written to a new raster. In this example, instead of having a single core serially performing a square root calculation across a raster (the first image below) we can segment our raster into a number of tiles, assign each tile to a worker and then perform the square root operation for each pixel in the tile outputting the result to a single raster which is shown in the second image below.
Let‘s return to the raster coding example that we used to build our ArcGIS Pro tool earlier in the lesson. That simple example processed a list of rasters and completed a number of tasks on each raster. Based on what you have read so far I expect that you have realized that this is also a pleasingly parallel problem.
Bearing in mind the caveats about parallel programming from above and the process that we undertook to convert the Cherry-O program, let's begin.
Our first task is to identify the parts of our problem that can work in parallel and the parts which we need to run sequentially.
The best place to start with this can be with the pseudocode of the original task. If we have documented our sequential code well, this could be as simple as copying/pasting each line of documentation into a new file and working through the process. We can start with the text description of the problem and build our sequential pseudocode from there and then create the multiprocessing pseudocode. It is very important to correctly and carefully design our multiprocessing solutions to ensure that they are as efficient as possible and that the worker functions have the bare minimum of data that they need to complete the tasks, use in_memory workspaces, and write as little data back to disk as possible.
Our original task was:
Get a list of raster tiles
For every tile in the list:
Fill the DEM
Create a slope raster
Calculate a flow direction raster
Calculate a flow accumulation raster
Convert those stream rasters to polygon or polyline feature classes.
You will notice that I’ve formatted the pseudocode just like Python code with indentations showing which instructions are within the loop.
As this is a simple example we can place all the functionality within the loop into our worker function as it will be called for every raster. The list of rasters will need to be determined sequentially and we’ll then pass that to our multiprocessing function and let the map element of multiprocessing map each raster onto a worker to perform the tasks. We won’t explicitly be using the reduce part of multiprocessing here as the output will be a featureclass but reduce will probably tidy up after us by deleting temporary files that we don’t need.
Our new pseudocode then will look like :
Get a list of raster tiles
For every tile in the list:
Launch a worker function with the name of a raster
Worker:
Fill the DEM
Create a slope raster
Calculate a flow direction raster
Calculate a flow accumulation raster
Convert those stream rasters to polygon or polyline feature classes.
Bear in mind that not all multiprocessing conversions are this simple. We need to remember that user output can be complicated because multiple workers might be attempting to write messages to our screen at once and that can cause those messages to get garbled and confused. A workaround for this problem is to use Python’s logging library which is much better at handling messages than us manually using print statements. We haven't implemented logging in this sample solution for this script but feel free to briefly investigate it to supplement the print and arcpy.AddMessage functions with calls to the logging function. The Python Logging Cookbook has some helpful examples.
As an exercise, attempt to implement the conversion from sequential to multiprocessing. You will probably not get everything right since there are a few details that need to be taken into account such as setting up an individual scratch workspace for each call of the worker function. In addition, to be able to run as a script tool the script needs to be separated into two files with the worker function in its own file. But don't worry about these things, just try to set up the overall structure in the same way as in the Cherry-O multiprocessing version and then place the code from the sequential version of the raster example either in the main function or worker function depending on where you think it needs to go. Then check out the solution linked below.
Click here for one way of implementing the solution
When you run this code, do you notice any performance differences between the sequential and multiprocessor versions?
The sequential version took 96 seconds on the same 4-processor PC we were using in the Cherry-O example, while the multiprocessor version completed in 58 seconds. Again not 4 times faster as we might expect but nearly twice as fast with multiprocessing is a good improvement. For reference, the 32-processor PC from the Cherry-O example processed the sequential code in 110 seconds and the multiprocessing version in 40 seconds. We will look in more detail at the individual lines of code and their performance when we examine code profiling, but you might also find it useful to watch the CPU usage tab in Task Manager to see how hard (or not) your PC is working.
1.6.6.2 Multiprocessing with vector data
1.6.6.2 Multiprocessing with vector data jmk649The best practices of multiprocessing that we introduced earlier are even more important when we are working with vector data than they are with raster data. The geodatabase locking issue is likely to become much more of a factor as typically we use more vector data than raster and often geodatabases are used more with feature classes.
The example we’re going to use here involves clipping a feature layer by polygons in another feature layer. A sample use case of this might be if you need to segment one or several infrastructure layers by state or county (or even a smaller subdivision). If I want to provide each state or county with a version of the roads, sewer, water or electricity layers (for example) this would be a helpful script. To test out the code in this section (and also the first homework assignment), you can again use the data from the USA.gdb geodatabase (Section 1.5) we provided. The application then is to clip the data from the roads, cities, or hydrology data sets to the individual state polygons from the States data set in the geodatabase.
To achieve this task, one could run the Clip tool manually in ArcGIS Pro but if there are a lot of polygons in the clip data set, it will be more effective to write a script that performs the task. As each state/county is unrelated to the others, this is an example of an operation that can be run in parallel.
Let us examine the code’s logic and then we’ll dig into the syntax. The code has two Python files. This is important because when we want to be able to run it as a script tool in ArcGIS, it is required that the worker function for running the individual tasks be defined in its own module file, not in the main script file for the script tool with the multiprocessing code that calls the worker function. The first file called scripttool.py imports arcpy, multiprocessing, and the worker code contained in the second python file called multicode.py and it contains the definition of the main function mp_handler() responsible for managing the multiprocessing operations similar to the Cherry-O multiprocessing version. It uses two script tool parameters, the file containing the polygons to use for clipping (variable clipper) and the file to be clipped (variable tobeclipped). Furthermore, the file includes a definition of an auxiliary function get_install_path() which is needed to determine the location of the Python interpreter for running the subprocesses in when running the code as a script tool in ArcGIS. The content of this function you don't have to worry about. The main function mp_handler() calls the worker(...) function located in the multicode file, passing it the files to be used and other information needed to perform the clipping operation. This will be further explained below . The code for the first file including the main function is shown below.
import os, sys
import arcpy
import multiprocessing
from multicode import worker
# Input parameters
clipper = arcpy.GetParameterAsText(0) if arcpy.GetParameterAsText(0) else r"C:\489\USA.gdb\States"
tobeclipped = arcpy.GetParameterAsText(1) if arcpy.GetParameterAsText(1) else r"C:\489\USA.gdb\Roads"
def mp_handler():
try:
# Create a list of object IDs for clipper polygons
arcpy.AddMessage("Creating Polygon OID list...")
clipperDescObj = arcpy.Describe(clipper)
field = clipperDescObj.OIDFieldName
idList = []
with arcpy.da.SearchCursor(clipper, [field]) as cursor:
for row in cursor:
id = row[0]
idList.append(id)
arcpy.AddMessage(f"There are {len(idList)} object IDs (polygons) to process.")
# Create a task list with parameter tuples for each call of the worker function. Tuples consist of the clippper,
# tobeclipped, field, and oid values.
jobs = []
for id in idList:
# adds tuples of the parameters that need to be given to the worker function to the jobs list
jobs.append((clipper,tobeclipped,field,id))
arcpy.AddMessage(f"Job list has {len(jobs)} elements.")
# Create and run multiprocessing pool.
# Set the python exe. Make sure the pythonw.exe is used for running processes, even when this is run as a
# script tool, or it will launch n number of Pro applications.
multiprocessing.set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
arcpy.AddMessage(f"Using {os.path.join(sys.exec_prefix, 'pythonw.exe')}")
# determine number of cores to use
cpuNum = multiprocessing.cpu_count()
arcpy.AddMessage(f"there are: {cpuNum} cpu cores on this machine")
# Create the pool object
with multiprocessing.Pool(processes=cpuNum) as pool:
arcpy.AddMessage("Sending to pool")
# run jobs in job list; res is a list with return values of the worker function
res = pool.starmap(worker, jobs)
# If an error has occurred within the workers, report it
# count how many times False appears in the list (res) with the return values
failed = res.count(False)
if failed > 0:
arcpy.AddError(f"{failed} workers failed!")
arcpy.AddMessage("Finished multiprocessing!")
except Exception as ex:
arcpy.AddError(ex)
if __name__ == '__main__':
mp_handler()
Let's now have a close look at the logic of the two main functions which will do the work. The first one is the mp_handler() function shown in the code section above. It takes the input variables and has the job of processing the polygons in the clipping file to get a list of their unique IDs, building a job list of parameter tuples that will be given to the individual calls of the worker function, setting up the multiprocessing pool and running it, and taking care of error handling.
The second function is the worker function called by the pool (named worker in this example) located in the multicode.py file (code shown below). This function takes the name of the clipping feature layer, the name of the layer to be clipped, the name of the field that contains the unique IDs of the polygons in the clipping feature layer, and the feature ID identifying the particular polygon to use for the clipping as parameters. This function will be called from the pool constructed in mp_handler().
The worker function will then make a selection from the clipping layer. This has to happen in the worker function because all parameters given to that function in a multiprocessing scenario need to be of a simple type that can be "pickled." Pickling data means converting it to a byte-stream which in the simplest terms means that the data is converted to a sequence of bytes that represents the object in a format that can be stored or transmitted. As feature classes are much more complicated than that containing spatial and non-spatial data, they cannot be readily converted to a simple type. That means feature classes cannot be "pickled" and any selections that might have been made in the calling function are not shared with the worker functions.
We need to think about creative ways of getting our data shared with our sub-processes. In this case, that means we’re not going to do the selection in the master module and pass the polygon to the worker module. Instead, we’re going to create a list of feature IDs that we want to process and we’ll pass an ID from that list as a parameter with each call of the worker function that can then do the selection with that ID on its own before performing the clipping operation. For this, the worker function selects the polygon matching the OID field parameter when creating a layer with MakeFeatureLayer_management() and uses this selection to clip the feature layer to be clipped. The results are saved in a shapefile including the OID in the file's name to ensure that the names are unique.
import arcpy
def worker(clipper, tobeclipped, field, oid):
"""
This is the function that gets called and does the work of clipping the input feature class to one of the
polygons from the clipper feature class. Note that this function does not try to write to arcpy.AddMessage() as
nothing is ever displayed.
param: clipper
param: tobeclipped
param: field
param: oid
"""
try:
# Create a layer with only the polygon with ID oid. Each clipper layer needs a unique name, so we include oid in the layer name.
query = f"{field} = {oid}"
tmp_flayer = arcpy.MakeFeatureLayer_management(clipper, f"clipper_{oid}", query)
# Do the clip. We include the oid in the name of the output feature class.
outFC = fr"c:\489\output\clip_{oid}.shp"
arcpy.Clip_analysis(tobeclipped, tmp_flayer, outFC)
# uncomment for debugging
# arcpy.AddMessage("finished clipping:", str(oid))
return True # everything went well so we return True
except:
# Some error occurred so return False
# print("error condition")
return False
Having covered the logic of the code, let's review the specific syntax used to make it all work. While you’re reading this, try visualizing how this code might run sequentially first – that is one polygon being used to clip the to-be-clipped feature class, then another polygon being used to clip the to-be-clipped feature class and so on (maybe through 4 or 5 iterations). Then once you have an understanding of how the code is running sequentially try to visualize how it might run in parallel with the worker function being called 4 times simultaneously and each worker performing its task independently of the other workers.
We’ll start with exploring the syntax within the mp_handler(...) function.
The mp_handler(...) function begins by determining the name of the field that contains the unique IDs of the clipper feature class using the arcpy.Describe(...) function (line 16 and 17). The code then uses a Search Cursor to get a list of all of the object (feature) IDs from within the clipper polygon feature class (line 20 to 23). This gives us a list of IDs that we can pass to our worker function along with the other parameters. As a check, the total count of that list is printed out (line 25).
Next, we create the job list with one entry for each call of the worker() function we want to make (lines 30 to 34). Each element in this list is a tuple of the parameters that should be given to that particular call of worker(). This list will be required when we set up the pool by calling pool.starmap(...). To construct the list, we simply loop through the ID list and append a parameter tuple to the list in variable jobs. The first three parameters will always be the same for all tuples in the job list; only the polygon ID will be different. In the homework assignment for this lesson, you will adapt this code to work with multiple input files to be clipped. As a result, the parameter tuples will vary in both the values for the oid parameter and for the tobeclipped parameter.
To prepare the multiprocessing pool, we first specify what executable should be used each time a worker is spawned (line 41). Without this line, a new instance of ArcGIS Pro would be launched by each worker, which is clearly less than ideal. Instead, this line uses the builtin sys.exec_prefix() variable and joins that path to the pythonw.exe executable. The next line prints out the path of the interpreter being used so you can verify that it is using the right one.
The code then sets up the size of the pool using the maximum number of processors in lines 45-49 (as we have done in previous examples) and then, using the starmap() method of Pool, calls the worker function worker(...) once for each parameter tuple in the jobs list (line 52).
Any outputs from the worker function will be stored in variable res as a list. These are the boolean values returned by the worker() function, True to indicate that everything went ok and False to indicate that the operation failed. If there is at least one False value in the list, an error message is produced stating the exact number of worker processes that failed (lines 57 to 58).
Let's now look at the code in our worker function worker(...) in the multicode file. As we noted in the logic section above, it receives four parameters: the full paths of the clipping and to-be-clipped feature classes, the name of the field that contains the unique IDs in the clipper feature class, and the OID of the polygon it is to use for the clipping.
Notice that the MakeFeatureLayer_management(...) function in line 18 is used to create a layer in memory which is a copy of the original clipper layer. This use of the memory layer is important in three ways: The first is performance – memory layers are faster; second, the use of an memory layer can help prevent any chance of file locking (although not if we were writing back to the file); third, selection only works on layers so even if we wanted to, we couldn’t get away without creating this layer.
The call of MakeFeatureLayer_management(...) also includes an SQL query string defined one line earlier in line 17 to create the layer with just the polygon that matches the oid that was passed as a parameter. The name of the layer we are producing here should be unique; this is why we’re adding {oid} to the name in the first parameter.
Now with our selection held in our memory, uniquely named feature layer, we perform the clip against our to-be-clipped layer (line 22) and store the result in outFC which we defined earlier in line 21 to be a hardcoded folder with a unique name starting with "clip_" followed by the oid. To run and test the code, you will most likely have to adapt the path used in variable outFC to match your PC.
The process then returns from the worker function and will be supplied with another oid. This will repeat until a call has been made for each polygon in the clipping feature class.
We are going to use this code as the basis for our Lesson 1 homework project. Have a look at the Assignment Page for full details.
You can test this code out by running it in a number of ways. If you run it from ArcGIS Pro as a script tool, the ternary operator will use the input values from GetParameterAsText(), or if it is executed outside the Pro environment (executed from an IDE or from the commandline), the hard coded path will be used since arcpy.GetParameterAsText(i) will be None.
The final thing to remember about this code is that it has a hardcoded output path defined in variable outFC in the worker() function - which you will want to change, create and/or parameterize etc. so that you have some output to investigate. If you do none of these things then no output will be created.
When the code runs it will create a shapefile for every unique object identifier in the "clipper" shapefile (there are 51 in the States data set from the sample data) named using the OID (that is clip_1.shp - clip_59.shp).
1.6.6.3 The if __name__ == "__main__": revisited
1.6.6.3 The if __name__ == "__main__": revisited jmk649In the examples we've used in the previous two sections, the worker function script has been separated from the main caller script and protected against possible infinite recursion or namespace conflicts by importing the worker script as a module (module.function()). However, what if you are limited to using a single script file, such as a Pro script tool that needs to be condensed for easy distribution? If we just add the worker function to the main script and try to reference that function as usual (function()), it will cause the multiprocessing module to import the entire main script, and will execute any top-level code (code not under the if __name__ == "__main__": block, functions, classes) for each new process. Other than infinite recursion, this can create conflicts in some custom Python environments such as Pro, leading to script failure, and it could possibly lock your PC up trying to open n** (exponentially for each process started) instances of Pro.
Remember that Python sets special variables when a script is executed. One of these is __name__, which is set to "__main__" when the script is run directly, or set to the script's name (module) when it is being imported. By importing the main script name within the if __name__ == "__main__": block, you ensure that the multiprocessing module correctly references functions that are within the imported script using the standard <main_script_name>.<function>() syntax and prevents the imported main script's code within the if __name__ == "__main__": from executing for each process.