User-Defined Functions#

Vertica is an analytical database with many advanced functions. While Vertica obviously doesn’t have every function, it comes pretty close by implementing a Python SDK that allows users to create and import their own “lambda” functions into Vertica.

While the SDK is somewhat difficult to use, VerticaPy makes it easy by generating UDFs from python code and importing them into Vertica for you.

To demonstrate this, let’s look at the following example. We want to bring into Vertica two functions from the ‘math’ module. We also want to import our own custom ‘normalize_titanic’ function defined in the ‘pmath.py’ file.

[1]:
from verticapy.udf import create_lib_udf
import verticapy as vp

def normalize_titanic(age, fare):
    return (age - 30.15) / 14.44, (fare - 33.96) / 52.65

# The generated files must be placed in the folder: /home/dbadmin/
file_path = "/home/dbadmin/python_math_lib.py"
pmath_path = os.path.dirname(vp.__file__) + "/tests/udf/pmath.py"
udx_str, udx_sql = create_lib_udf([(math.exp, [float], float, {}, "python_exp"),
                                   (math.isclose, [float, float], bool, {"abs_tol": float}, "python_isclose"),
                                   (normalize_titanic, [float, float], {"norm_age": float, "norm_fare": float}, {}, "python_norm_titanic"),],
                                   library_name = "python_math",
                                   include_dependencies = pmath_path,
                                   file_path = file_path,
                                   create_file = False)

We simply need to provide some information about our functions (input/output types, parameters, library name, etc.) and VerticaPy will generate two files.

One for the UDx:

[2]:
print(udx_str)
import vertica_sdk
import math

def normalize_titanic(age, fare):
    return (age - 30.15) / 14.44, (fare - 33.96) / 52.65

class verticapy_python_exp(vertica_sdk.ScalarFunction):

        def setup(self, server_interface, col_types):
                self.params = {}

        def processBlock(self, server_interface, arg_reader, res_writer):
                while(True):
                        inputs  = []
                        inputs += [arg_reader.getFloat(0)]
                        result = math.exp(*inputs, **self.params)
                        res_writer.setFloat(result)
                        res_writer.next()
                        if not arg_reader.next():
                                break

        def destroy(self, server_interface, col_types):
                pass

class verticapy_python_exp_factory(vertica_sdk.ScalarFunctionFactory):

        def createScalarFunction(self, srv):
                return verticapy_python_exp()

        def getPrototype(self, server_interface, arg_types, return_type):
                arg_types.addFloat()
                return_type.addFloat()

        def getReturnType(self, server_interface, arg_types, return_type):
                return_type.addFloat()


class verticapy_python_isclose(vertica_sdk.ScalarFunction):

        def setup(self, server_interface, col_types):
                params = server_interface.getParamReader()
                self.params = {}
                if params.containsParameter('abs_tol'):
                        self.params['abs_tol'] = params.getFloat('abs_tol')

        def processBlock(self, server_interface, arg_reader, res_writer):
                while(True):
                        inputs  = []
                        inputs += [arg_reader.getFloat(0)]
                        inputs += [arg_reader.getFloat(1)]
                        result = math.isclose(*inputs, **self.params)
                        res_writer.setBool(result)
                        res_writer.next()
                        if not arg_reader.next():
                                break

        def destroy(self, server_interface, col_types):
                pass

class verticapy_python_isclose_factory(vertica_sdk.ScalarFunctionFactory):

        def createScalarFunction(self, srv):
                return verticapy_python_isclose()

        def getPrototype(self, server_interface, arg_types, return_type):
                arg_types.addFloat()
                arg_types.addFloat()
                return_type.addBool()

        def getReturnType(self, server_interface, arg_types, return_type):
                return_type.addBool()

        def getParameterType(self, server_interface, parameterTypes):
                parameterTypes.addFloat('abs_tol')

class verticapy_python_norm_titanic(vertica_sdk.TransformFunction):

        def setup(self, server_interface, col_types):
                self.params = {}

        def processPartition(self, server_interface, arg_reader, res_writer):
                while(True):
                        inputs  = []
                        inputs += [arg_reader.getFloat(0)]
                        inputs += [arg_reader.getFloat(1)]
                        result = normalize_titanic(*inputs, **self.params)
                        if len(result) == 1:
                                result = result[0]
                        res_writer.setFloat(0, result[0])
                        res_writer.setFloat(1, result[1])
                        res_writer.next()
                        if not arg_reader.next():
                                break

        def destroy(self, server_interface, col_types):
                pass

class verticapy_python_norm_titanic_factory(vertica_sdk.TransformFunctionFactory):

        def createTransformFunction(self, srv):
                return verticapy_python_norm_titanic()

        def getPrototype(self, server_interface, arg_types, return_type):
                arg_types.addFloat()
                arg_types.addFloat()
                return_type.addFloat()
                return_type.addFloat()

        def getReturnType(self, server_interface, arg_types, return_type):
                return_type.addFloat('norm_age')
                return_type.addFloat('norm_fare')


And one for the SQL statements.

[3]:
print("\n".join(udx_sql))
CREATE OR REPLACE LIBRARY python_math AS '/home/dbadmin/python_math_lib.py' LANGUAGE 'Python';
CREATE OR REPLACE FUNCTION python_exp AS NAME 'verticapy_python_exp_factory' LIBRARY python_math;
CREATE OR REPLACE FUNCTION python_isclose AS NAME 'verticapy_python_isclose_factory' LIBRARY python_math;
CREATE OR REPLACE TRANSFORM FUNCTION python_norm_titanic AS NAME 'verticapy_python_norm_titanic_factory' LIBRARY python_math;

We can then run these queries in our server.

Our functions are now available in Vertica.

[4]:
%load_ext verticapy.sql
%sql SELECT python_exp(1);
Execution: 0.11s
[4]:
123
python_exp
Float
12.71828182845905
Rows: 1 | Column: python_exp | Type: Float
[5]:
%sql SELECT python_isclose(2, 3 USING PARAMETERS abs_tol=0.01);
Execution: 0.119s
[5]:
010
python_isclose
Boolean
1
Rows: 1 | Column: python_isclose | Type: Boolean
[6]:
%sql SELECT python_isclose(2, 3 USING PARAMETERS abs_tol=1);
Execution: 0.128s
[6]:
010
python_isclose
Boolean
1
Rows: 1 | Column: python_isclose | Type: Boolean
[7]:
%sql SELECT python_norm_titanic(20.0, 30.0) OVER();
Execution: 0.135s
[7]:
123
norm_age
Float
123
norm_fare
Float
1-0.702908587257618-0.0752136752136752
Rows: 1 | Columns: 2

It is now easy to bring customized Python functions.