SourceFactory Class

If you write a source, you must also write a source factory. Your subclass of the SourceFactory class is responsible for:

  • Performing the initial validation of the parameters passed to your UDSource.
  • Setting up any data structures your UDSource instances need to perform their work. This information can include recording which nodes will read which data source.
  • Creating one instance of your UDSource subclass for each data source (or portion thereof) that your function reads on each host.

The simplest source factory creates one UDSource instance per data source per executor node. You can also use multiple concurrent UDSource instances on each node. This behavior is called concurrent load. To support both options, SourceFactory has two versions of the method that creates the sources. You must implement exactly one of them.

Source factories are singletons. Your subclass must be stateless, with no fields containing data. The subclass also must not modify any global variables.

SourceFactory Methods

The SourceFactory class defines several methods. Your class must override prepareUDSources(); it may override the other methods.

Setting Up

Vertica calls plan() once on the initiator node to perform the following tasks:

  • Check the parameters the user supplied to the function call in the COPY statement and provide error messages if there are any issues. You can read the parameters by getting a ParamReader object from the instance of ServerInterface passed into the plan() method.
  • Decide which hosts in the cluster will read the data source. How you divide up the work depends on the source your function is reading. Some sources can be split across many hosts, such as a source that reads data from many URLs. Others, such as an individual local file on a host's file system, can be read only by a single specified host.

    You store the list of hosts to read the data source by calling the setTargetNodes() method on the NodeSpecifyingPlanContext object. This object is passed into your plan() method.

  • Store any information that the individual hosts need to process the data sources in the NodeSpecifyingPlanContext instance passed to the plan() method. For example, you could store assignments that tell each host which data sources to process. The plan() method runs only on the initiator node, and the prepareUDSources() method runs on each host reading from a data source. Therefore, this object is the only means of communication between them.

    You store data in the NodeSpecifyingPlanContext by getting a ParamWriter object from the getWriter() method. You then write parameters by calling methods on the ParamWriter such as setString().

    ParamWriter offers the ability to store only simple data types. For complex types, you must serialize the data in some manner and store it as a string or long string.

Creating Sources

Vertica calls prepareUDSources() on all hosts that the plan() method selected to load data. This call instantiates and returns a list of UDSource subclass instances. If you are not using concurrent load, return one UDSource for each of the sources that the host is assigned to process. If you are using concurrent load, use the version of the method that takes an ExecutorPlanContext as a parameter, and return as many sources as you can use. Your factory must implement exactly one of these methods.

In the C++ API, the function that supports concurrent load is named prepareUDSourcesExecutor(). In the Java API the class provides two overloads of prepareUDSources().

For concurrent load, you can find out how many threads are available on the node to run UDSource instances by calling getLoadConcurrency() on the ExecutorPlanContext that is passed in.

Defining Parameters

Implement getParameterTypes() to define the names and types of parameters that your source uses. Vertica uses this information to warn callers about unknown or missing parameters. Vertica ignores unknown parameters and uses default values for missing parameters. While you should define the types and parameters for your function, you are not required to override this method.

Requesting Threads for Concurrent Load

When a source factory creates sources on an executor node, by default, it creates one thread per source. If your sources can use multiple threads, implement getDesiredThreads(). Vertica calls this method before it calls prepareUDSources(), so you can also use it to decide how many sources to create. Return the number of threads your factory can use for sources. The maximum number of available threads is passed in, so you can take that into account. The value your method returns is a hint, not a guarantee; each executor node determines the number of threads to allocate. The FilePortionSourceFactory example implements this method; see C++ Example: Concurrent Load.

You can allow your source to have control over parallelism, meaning that it can divide a single input into multiple load streams, by implementing isSourceApportionable(). Returning true from this method does not guarantee that the source will apportion the load. However, returning false indicates that it will not try to do so. See Apportioned Load for more information.

Often, a SourceFactory that implements getDesiredThreads() also uses apportioned load. However, using apportioned load is not a requirement. A source reading from Kafka streams, for example, could use multiple threads without ssapportioning.

API

The SourceFactory API provides the following methods for extension by subclasses:

virtual void plan(ServerInterface &srvInterface, NodeSpecifyingPlanContext &planCtxt);

// must implement exactly one of prepareUDSources() or prepareUDSourcesExecutor()
virtual std::vector< UDSource * > prepareUDSources(ServerInterface &srvInterface, 
			NodeSpecifyingPlanContext &planCtxt);
				
virtual std::vector< UDSource * > prepareUDSourcesExecutor(ServerInterface &srvInterface, 
			ExecutorPlanContext &planCtxt);
				
virtual void getParameterType(ServerInterface &srvInterface, 
			SizedColumnTypes &parameterTypes);
				
virtual bool isSourceApportionable();
				
ssize_t getDesiredThreads(ServerInterface &srvInterface, 
			ExecutorPlanContext &planContext);

After creating your SourceFactory, you must register it with the RegisterFactory macro.

The SourceFactory API provides the following methods for extension by subclasses:

public void plan(ServerInterface srvInterface, NodeSpecifyingPlanContext planCtxt) 
	throws UdfException;

// must implement one overload of prepareUDSources()				
public ArrayList< UDSource > prepareUDSources(ServerInterface srvInterface, 
				NodeSpecifyingPlanContext planCtxt) 
	throws UdfException;
				
public ArrayList< UDSource > prepareUDSources(ServerInterface srvInterface, 
				ExecutorPlanContext planCtxt)
	throws UdfException;
				
public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes);
				
public boolean isSourceApportionable();
				
public int getDesiredThreads(ServerInterface srvInterface,
				ExecutorPlanContext planCtxt) 
	throws UdfException;