SourceFactory Class
If you write a source, you must also write a source factory. Your subclass of the SourceFactory
class is responsible for:
- Performing the initial validation of the parameters passed to your
UDSource
. - Setting up any data structures your
UDSource
instances need to perform their work. This information can include recording which nodes will read which data source. - Creating one instance of your
UDSource
subclass for each data source (or portion thereof) that your function reads on each host.
The simplest source factory creates one UDSource
instance per data source per executor node. You can also use multiple concurrent UDSource
instances on each node. This behavior is called concurrent load. To support both options, SourceFactory
has two versions of the method that creates the sources. You must implement exactly one of them.
Source factories are singletons. Your subclass must be stateless, with no fields containing data. The subclass also must not modify any global variables.
SourceFactory Methods
The SourceFactory
class defines several methods. Your class must override prepareUDSources()
; it may override the other methods.
Setting Up
Vertica calls plan()
once on the initiator node to perform the following tasks:
- Check the parameters the user supplied to the function call in the COPY statement and provide error messages if there are any issues. You can read the parameters by getting a
ParamReader
object from the instance ofServerInterface
passed into theplan()
method. -
Decide which hosts in the cluster will read the data source. How you divide up the work depends on the source your function is reading. Some sources can be split across many hosts, such as a source that reads data from many URLs. Others, such as an individual local file on a host's file system, can be read only by a single specified host.
You store the list of hosts to read the data source by calling the
setTargetNodes()
method on theNodeSpecifyingPlanContext
object. This object is passed into yourplan()
method. -
Store any information that the individual hosts need to process the data sources in the
NodeSpecifyingPlanContext
instance passed to theplan()
method. For example, you could store assignments that tell each host which data sources to process. Theplan()
method runs only on the initiator node, and theprepareUDSources()
method runs on each host reading from a data source. Therefore, this object is the only means of communication between them.You store data in the
NodeSpecifyingPlanContext
by getting aParamWriter
object from thegetWriter()
method. You then write parameters by calling methods on theParamWriter
such assetString()
.ParamWriter
offers the ability to store only simple data types. For complex types, you must serialize the data in some manner and store it as a string or long string.
Creating Sources
Vertica calls prepareUDSources()
on all hosts that the plan()
method selected to load data. This call instantiates and returns a list of UDSource
subclass instances. If you are not using concurrent load, return one UDSource
for each of the sources that the host is assigned to process. If you are using concurrent load, use the version of the method that takes an ExecutorPlanContext
as a parameter, and return as many sources as you can use. Your factory must implement exactly one of these methods.
In the C++ API, the function that supports concurrent load is named prepareUDSourcesExecutor()
. In the Java API the class provides two overloads of prepareUDSources()
.
For concurrent load, you can find out how many threads are available on the node to run UDSource
instances by calling getLoadConcurrency()
on the ExecutorPlanContext
that is passed in.
Defining Parameters
Implement getParameterTypes()
to define the names and types of parameters that your source uses. Vertica uses this information to warn callers about unknown or missing parameters. Vertica ignores unknown parameters and uses default values for missing parameters. While you should define the types and parameters for your function, you are not required to override this method.
Requesting Threads for Concurrent Load
When a source factory creates sources on an executor node, by default, it creates one thread per source. If your sources can use multiple threads, implement getDesiredThreads()
. Vertica calls this method before it calls prepareUDSources()
, so you can also use it to decide how many sources to create. Return the number of threads your factory can use for sources. The maximum number of available threads is passed in, so you can take that into account. The value your method returns is a hint, not a guarantee; each executor node determines the number of threads to allocate. The FilePortionSourceFactory
example implements this method; see C++ Example: Concurrent Load.
You can allow your source to have control over parallelism, meaning that it can divide a single input into multiple load streams, by implementing isSourceApportionable()
. Returning true
from this method does not guarantee that the source will apportion the load. However, returning false
indicates that it will not try to do so. See Apportioned Load for more information.
Often, a SourceFactory
that implements getDesiredThreads()
also uses apportioned load. However, using apportioned load is not a requirement. A source reading from Kafka streams, for example, could use multiple threads without ssapportioning.
API
- C++
- Java
The SourceFactory API provides the following methods for extension by subclasses:
virtual void plan(ServerInterface &srvInterface, NodeSpecifyingPlanContext &planCtxt); // must implement exactly one of prepareUDSources() or prepareUDSourcesExecutor() virtual std::vector< UDSource * > prepareUDSources(ServerInterface &srvInterface, NodeSpecifyingPlanContext &planCtxt); virtual std::vector< UDSource * > prepareUDSourcesExecutor(ServerInterface &srvInterface, ExecutorPlanContext &planCtxt); virtual void getParameterType(ServerInterface &srvInterface, SizedColumnTypes ¶meterTypes); virtual bool isSourceApportionable(); ssize_t getDesiredThreads(ServerInterface &srvInterface, ExecutorPlanContext &planContext);
After creating your SourceFactory
, you must register it with the RegisterFactory
macro.
The SourceFactory API provides the following methods for extension by subclasses:
public void plan(ServerInterface srvInterface, NodeSpecifyingPlanContext planCtxt) throws UdfException; // must implement one overload of prepareUDSources() public ArrayList< UDSource > prepareUDSources(ServerInterface srvInterface, NodeSpecifyingPlanContext planCtxt) throws UdfException; public ArrayList< UDSource > prepareUDSources(ServerInterface srvInterface, ExecutorPlanContext planCtxt) throws UdfException; public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes); public boolean isSourceApportionable(); public int getDesiredThreads(ServerInterface srvInterface, ExecutorPlanContext planCtxt) throws UdfException;