Python Example: Multi-Phase Calculation

The following example shows a multi-phase transform function that computes the average value on a column of numbers in an input table. It first defines two transform functions, and then defines a factory that creates the phases using them.

See AvgMultiPhaseUDT.py in the examples distribution for the complete code.

Loading and Using the Example

Create the library and function:

=> CREATE LIBRARY pylib_avg AS '/home/dbadmin/udx/AvgMultiPhaseUDT.py' LANGUAGE 'Python'; 
CREATE LIBRARY
=> CREATE TRANSFORM FUNCTION myAvg AS NAME 'MyAvgFactory' LIBRARY pylib_avg;
CREATE TRANSFORM FUNCTION

You can then use the function in SELECT statements:

=> CREATE TABLE IF NOT EXISTS numbers(num FLOAT);
CREATE TABLE

=> COPY numbers FROM STDIN delimiter ',' DIRECT;
1
2
3
4
\.
				
=> SELECT myAvg(num) OVER() FROM numbers;
 average | ignored_rows | total_rows 
---------+--------------+------------
     2.5 |            0 |          4
(1 row)

Setup

All Python UDxs must import the Vertica SDK. This example also imports another library.

import vertica_sdk
import math

Component Transform Functions

A multi-phase transform function must define two or more TransformFunction subclasses to be used in the phases. This example uses two classes: LocalCalculation, which does calculations on local partitions, and GlobalCalculation, which aggregates the results of all LocalCalculation instances to calculate a final result.

In both functions, the calculation is done in the processPartition() function:

class LocalCalculation(vertica_sdk.TransformFunction):
    """
    This class is the first phase and calculates the local values for sum, ignored_rows and total_rows.
    """

    def setup(self, server_interface, col_types):
        server_interface.log("Setup: Phase0")
        self.local_sum = 0.0
        self.ignored_rows = 0
        self.total_rows = 0

    def processPartition(self, server_interface, input, output):
        server_interface.log("Process Partition: Phase0")

        while True:
            self.total_rows += 1

            if input.isNull(0) or math.isinf(input.getFloat(0)) or math.isnan(input.getFloat(0)):
                # Null, Inf, or Nan is ignored
                self.ignored_rows += 1
            else:
                self.local_sum += input.getFloat(0)

            if not input.next():
                break

        output.setFloat(0, self.local_sum)
        output.setInt(1, self.ignored_rows)
        output.setInt(2, self.total_rows)
        output.next()												

class GlobalCalculation(vertica_sdk.TransformFunction):
    """
    This class is the second phase and aggregates the values for sum, ignored_rows and total_rows.
    """
				
    def setup(self, server_interface, col_types):
        server_interface.log("Setup: Phase1")
        self.global_sum = 0.0
        self.ignored_rows = 0
        self.total_rows = 0

    def processPartition(self, server_interface, input, output):
        server_interface.log("Process Partition: Phase1")

        while True:
            self.global_sum += input.getFloat(0)
            self.ignored_rows += input.getInt(1)
            self.total_rows += input.getInt(2)

            if not input.next():
                break

        average = self.global_sum / (self.total_rows - self.ignored_rows)

        output.setFloat(0, average)
        output.setInt(1, self.ignored_rows)
        output.setInt(2, self.total_rows)
        output.next()				

Multi-Phase Factory

A MultiPhaseTransformFunctionFactory ties together the individual functions as phases. The factory defines a TransformFunctionPhase for each function. Each phase defines createTransformFunction(), which calls the constructor for the corresponding TransformFunction, and getReturnType().

The first phase, LocalPhase, follows.

class MyAvgFactory(vertica_sdk.MultiPhaseTransformFunctionFactory):
    """ Factory class """

    class LocalPhase(vertica_sdk.TransformFunctionPhase):
        """ Phase 1 """
        def getReturnType(self, server_interface, input_types, output_types):
            # sanity check
            number_of_cols = input_types.getColumnCount()
            if (number_of_cols != 1 or not input_types.getColumnType(0).isFloat()):
                raise ValueError("Function only accepts one argument (FLOAT))")

            output_types.addFloat("local_sum");
            output_types.addInt("ignored_rows");
            output_types.addInt("total_rows");

        def createTransformFunction(cls, server_interface):
            return LocalCalculation()		

The second phase, GlobalPhase, does not check its inputs because the first phase already did. As with the first phase, createTransformFunction merely constructs and returns the corresponding TransformFunction.

    class GlobalPhase(vertica_sdk.TransformFunctionPhase):
        """ Phase 2 """
        def getReturnType(self, server_interface, input_types, output_types):
            output_types.addFloat("average");
            output_types.addInt("ignored_rows");
            output_types.addInt("total_rows");

        def createTransformFunction(cls, server_interface):
            return GlobalCalculation()

After defining the TransformFunctionPhase subclasses, the factory instantiates them and chains them together in getPhases().

    ph0Instance = LocalPhase()
    ph1Instance = GlobalPhase()

    def getPhases(cls, server_interface):
        cls.ph0Instance.setPrepass()
        phases = [cls.ph0Instance, cls.ph1Instance]
        return phases