User-Defined Functions#
Vertica is an analytical database with many advanced functions. While Vertica obviously doesn’t have every function, it comes pretty close by implementing a Python SDK that allows users to create and import their own “lambda” functions into Vertica.
While the SDK is somewhat difficult to use, VerticaPy makes it easy by generating UDFs from python code and importing them into Vertica for you.
To demonstrate this, let’s look at the following example. We want to bring into Vertica two functions from the ‘math’ module. We also want to import our own custom ‘normalize_titanic’ function defined in the ‘pmath.py’ file.
[1]:
from verticapy.udf import create_lib_udf
import verticapy as vp
def normalize_titanic(age, fare):
return (age - 30.15) / 14.44, (fare - 33.96) / 52.65
# The generated files must be placed in the folder: /home/dbadmin/
file_path = "/home/dbadmin/python_math_lib.py"
pmath_path = os.path.dirname(vp.__file__) + "/tests/udf/pmath.py"
udx_str, udx_sql = create_lib_udf([(math.exp, [float], float, {}, "python_exp"),
(math.isclose, [float, float], bool, {"abs_tol": float}, "python_isclose"),
(normalize_titanic, [float, float], {"norm_age": float, "norm_fare": float}, {}, "python_norm_titanic"),],
library_name = "python_math",
include_dependencies = pmath_path,
file_path = file_path,
create_file = False)
We simply need to provide some information about our functions (input/output types, parameters, library name, etc.) and VerticaPy will generate two files.
One for the UDx:
[2]:
print(udx_str)
import vertica_sdk
import math
def normalize_titanic(age, fare):
return (age - 30.15) / 14.44, (fare - 33.96) / 52.65
class verticapy_python_exp(vertica_sdk.ScalarFunction):
def setup(self, server_interface, col_types):
self.params = {}
def processBlock(self, server_interface, arg_reader, res_writer):
while(True):
inputs = []
inputs += [arg_reader.getFloat(0)]
result = math.exp(*inputs, **self.params)
res_writer.setFloat(result)
res_writer.next()
if not arg_reader.next():
break
def destroy(self, server_interface, col_types):
pass
class verticapy_python_exp_factory(vertica_sdk.ScalarFunctionFactory):
def createScalarFunction(self, srv):
return verticapy_python_exp()
def getPrototype(self, server_interface, arg_types, return_type):
arg_types.addFloat()
return_type.addFloat()
def getReturnType(self, server_interface, arg_types, return_type):
return_type.addFloat()
class verticapy_python_isclose(vertica_sdk.ScalarFunction):
def setup(self, server_interface, col_types):
params = server_interface.getParamReader()
self.params = {}
if params.containsParameter('abs_tol'):
self.params['abs_tol'] = params.getFloat('abs_tol')
def processBlock(self, server_interface, arg_reader, res_writer):
while(True):
inputs = []
inputs += [arg_reader.getFloat(0)]
inputs += [arg_reader.getFloat(1)]
result = math.isclose(*inputs, **self.params)
res_writer.setBool(result)
res_writer.next()
if not arg_reader.next():
break
def destroy(self, server_interface, col_types):
pass
class verticapy_python_isclose_factory(vertica_sdk.ScalarFunctionFactory):
def createScalarFunction(self, srv):
return verticapy_python_isclose()
def getPrototype(self, server_interface, arg_types, return_type):
arg_types.addFloat()
arg_types.addFloat()
return_type.addBool()
def getReturnType(self, server_interface, arg_types, return_type):
return_type.addBool()
def getParameterType(self, server_interface, parameterTypes):
parameterTypes.addFloat('abs_tol')
class verticapy_python_norm_titanic(vertica_sdk.TransformFunction):
def setup(self, server_interface, col_types):
self.params = {}
def processPartition(self, server_interface, arg_reader, res_writer):
while(True):
inputs = []
inputs += [arg_reader.getFloat(0)]
inputs += [arg_reader.getFloat(1)]
result = normalize_titanic(*inputs, **self.params)
if len(result) == 1:
result = result[0]
res_writer.setFloat(0, result[0])
res_writer.setFloat(1, result[1])
res_writer.next()
if not arg_reader.next():
break
def destroy(self, server_interface, col_types):
pass
class verticapy_python_norm_titanic_factory(vertica_sdk.TransformFunctionFactory):
def createTransformFunction(self, srv):
return verticapy_python_norm_titanic()
def getPrototype(self, server_interface, arg_types, return_type):
arg_types.addFloat()
arg_types.addFloat()
return_type.addFloat()
return_type.addFloat()
def getReturnType(self, server_interface, arg_types, return_type):
return_type.addFloat('norm_age')
return_type.addFloat('norm_fare')
And one for the SQL statements.
[3]:
print("\n".join(udx_sql))
CREATE OR REPLACE LIBRARY python_math AS '/home/dbadmin/python_math_lib.py' LANGUAGE 'Python';
CREATE OR REPLACE FUNCTION python_exp AS NAME 'verticapy_python_exp_factory' LIBRARY python_math;
CREATE OR REPLACE FUNCTION python_isclose AS NAME 'verticapy_python_isclose_factory' LIBRARY python_math;
CREATE OR REPLACE TRANSFORM FUNCTION python_norm_titanic AS NAME 'verticapy_python_norm_titanic_factory' LIBRARY python_math;
We can then run these queries in our server.
Our functions are now available in Vertica.
[4]:
%load_ext verticapy.sql
%sql SELECT python_exp(1);
[4]:
123 python_expFloat | |
1 | 2.71828182845905 |
[5]:
%sql SELECT python_isclose(2, 3 USING PARAMETERS abs_tol=0.01);
[5]:
010 python_iscloseBoolean | |
1 |
[6]:
%sql SELECT python_isclose(2, 3 USING PARAMETERS abs_tol=1);
[6]:
010 python_iscloseBoolean | |
1 |
[7]:
%sql SELECT python_norm_titanic(20.0, 30.0) OVER();
[7]:
123 norm_ageFloat | 123 norm_fareFloat | |
1 | -0.702908587257618 | -0.0752136752136752 |
It is now easy to bring customized Python functions.