Source Example: Concurrent Load

The FilePortionSource example demonstrates the use of concurrent load. This example is a refinement of the FileSource example. Each input file is divided into portions and distributed to FilePortionSource instances. The source accepts a list of offsets at which to break the input into portions; if offsets are not provided, the source divides the input dynamically.

Concurrent load is handled in the factory, so this discussion focuses on FilePortionSourceFactory. The full code for the example is located in /opt/vertica/sdk/examples/ApportionLoadFunctions. The distribution also includes a Java version of this example.

Loading and Using the Example

Load and use the FilePortionSource example as follows.

=> CREATE LIBRARY FilePortionLib AS '/home/dbadmin/FP.so';
				
=> CREATE SOURCE FilePortionSource AS LANGUAGE 'C++'
-> NAME 'FilePortionSourceFactory' LIBRARY FilePortionLib;
				
=> COPY t WITH SOURCE FilePortionSource(file='g1/*.dat', nodes='initiator,e0,e1', offsets = '0,380000,820000');
				
=> COPY t WITH SOURCE FilePortionSource(file='g2/*.dat', nodes='e0,e1,e2', local_min_portion_size = 2097152);
				

Implementation

Concurrent load affects the source factory in two places, getDesiredThreads() and prepareUDSourcesExecutor().

getDesiredThreads()

The getDesiredThreads() member function determines the number of threads to request. Vertica calls this member function on each executor node before calling prepareUDSourcesExecutor().

The function begins by breaking an input file path, which might be a glob, into individual paths. This discussion omits those details. If apportioned load is not being used, then the function allocates one source per file.

virtual ssize_t getDesiredThreads(ServerInterface &srvInterface,
	ExecutorPlanContext &planCtxt) {
  const std::string filename = srvInterface.getParamReader().getStringRef("file").str();

  std::vector<std::string> paths;
  // expand the glob - at least one thread per source.
  ...

  // figure out how to assign files to sources			
  const std::string nodeName = srvInterface.getCurrentNodeName();
  const size_t nodeId = planCtxt.getWriter().getIntRef(nodeName);
  const size_t numNodes = planCtxt.getTargetNodes().size();

  if (!planCtxt.canApportionSource()) {
    /* no apportioning, so the number of files is the final number of sources */
    std::vector<std::string> *expanded =
        vt_createFuncObject<std::vector<std::string> >(srvInterface.allocator, paths);
    /* save expanded paths so we don't have to compute expansion again */
    planCtxt.getWriter().setPointer("expanded", expanded);
    return expanded->size();
  } 

  // ...

If the source can be apportioned, then getDesiredThreads() uses the offsets that were passed as arguments to the factory to divide the file into portions. It then allocates portions to available nodes. This function does not actually assign sources directly; this work is done to determine how many threads to request.

  else if (srvInterface.getParamReader().containsParameter("offsets")) {
				
    // if the offsets are specified, then we will have a fixed number of portions per file.
    // Round-robin assign offsets to nodes.
    // ...

    /* Construct the portions that this node will actually handle.
     * This isn't changing (since the offset assignments are fixed),
     * so we'll build the Portion objects now and make them available
     * to prepareUDSourcesExecutor() by putting them in the ExecutorContext.
     * 
     * We don't know the size of the last portion, since it depends on the file
     * size.  Rather than figure it out here we will indicate it with -1 and
     * defer that to prepareUDSourcesExecutor().
     */
    std::vector<Portion> *portions =
        vt_createFuncObject<std::vector<Portion>>(srvInterface.allocator);

    for (std::vector<size_t>::const_iterator offset = offsets.begin();
            offset != offsets.end(); ++offset) {
        Portion p(*offset);
        p.is_first_portion = (offset == offsets.begin());
        p.size = (offset + 1 == offsets.end() ? -1 : (*(offset + 1) - *offset));

        if ((offset - offsets.begin()) % numNodes == nodeId) {
            portions->push_back(p);
            srvInterface.log("FilePortionSource: assigning portion %ld: [offset = %lld, size = %lld]",
                    offset - offsets.begin(), p.offset, p.size);
        }
      }			
			

The function now has all the portions and thus the number of portions:

      planCtxt.getWriter().setPointer("portions", portions);

      /* total number of threads we want is the number of portions per file, which is fixed */
      return portions->size() * expanded->size();
    } // end of "offsets" parameter		
			

If offsets were not provided, the function divides the file into portions dynamically, one portion per thread. This discussion omits the details of this computation. There is no point in requesting more threads than are available, so the function calls getMaxAllowedThreads() on the PlanContext (an argument to the function) to set an upper bound:

  if (portions->size() >= planCtxt.getMaxAllowedThreads()) {
	return paths.size();
  }

See the full example for the details of how this function divides the file into portions.

This function uses the vt_createFuncObject template to create objects. Vertica calls the destructors of returned objects created using this macro, but it does not call destructors for other objects like vectors. You must call these destructors yourself to avoid memory leaks. In this example, these calls are made in prepareUDSourcesExecutor().

prepareUDSourcesExecutor()

The prepareUDSourcesExecutor() member function, like getDesiredThreads(), has separate blocks of code depending on whether offsets are provided. In both cases, the function breaks input into portions and creates UDSource instances for them.

If the function is called with offsets, prepareUDSourcesExecutor() calls prepareCustomizedPortions(). This function follows.

/* prepare portions as determined via the "offsets" parameter */
void prepareCustomizedPortions(ServerInterface &srvInterface,
                               ExecutorPlanContext &planCtxt,
                               std::vector<UDSource *> &sources,
                               const std::vector<std::string> &expandedPaths,
                               std::vector<Portion> &portions) {
    for (std::vector<std::string>::const_iterator filename = expandedPaths.begin();
            filename != expandedPaths.end(); ++filename) {
        /* 
         * the "portions" vector contains the portions which were generated in
         * "getDesiredThreads"
         */
        const size_t fileSize = getFileSize(*filename);
        for (std::vector<Portion>::const_iterator portion = portions.begin();
                portion != portions.end(); ++portion) {
            Portion fportion(*portion);
            if (fportion.size == -1) {
                /* as described above, this means from the offset to the end */
                fportion.size = fileSize - portion->offset;
                sources.push_back(vt_createFuncObject<FilePortionSource>(srvInterface.allocator,
                            *filename, fportion));
            } else if (fportion.size > 0) {
                sources.push_back(vt_createFuncObject<FilePortionSource>(srvInterface.allocator,
                            *filename, fportion));
            }
        }
    }
}

If prepareUDSourcesExecutor() is called without offsets, then it must decide how many portions to create.

The base case is to use one portion per source. However, if extra threads are available, the function divides the input into more portions so that a source can process them concurrently. Then prepareUDSourcesExecutor() calls prepareGeneratedPortions() to create the portions. This function begins by calling getLoadConcurrency() on the plan context to find out how many threads are available.

void prepareGeneratedPortions(ServerInterface &srvInterface,
                              ExecutorPlanContext &planCtxt,
                              std::vector<UDSource *> &sources,
                              std::map<std::string, Portion> initialPortions) {

  if ((ssize_t) initialPortions.size() >= planCtxt.getLoadConcurrency()) {
  /* all threads will be used, don't bother splitting into portions */

  for (std::map<std::string, Portion>::const_iterator file = initialPortions.begin();
       file != initialPortions.end(); ++file) {
    sources.push_back(vt_createFuncObject<FilePortionSource>(srvInterface.allocator,
            file->first, file->second));
       } // for
    return;
  } // if

  // Now we can split files to take advantage of potentially-unused threads.
  // First sort by size (descending), then we will split the largest first.

  // details elided...

}
			

For More Information

See the source code for the full implementation of this example.