C++ Example: Delimited Parser and Chunker

The ExampleDelimitedUDChunker class divides an input at delimiter characters. You can use this chunker with any parser that understands delimited input. ExampleDelimitedParser is a ContinuousUDParser subclass that uses this chunker.

Loading and Using the Example

Load and use the example as follows.

=> CREATE LIBRARY ExampleDelimitedParserLib AS '/home/dbadmin/EDP.so';
				
=> CREATE PARSER ExampleDelimitedParser AS
    LANGUAGE 'C++' NAME 'DelimitedParserFrameworkExampleFactory'
    LIBRARY ExampleDelimitedParserLib;
				
=> COPY t FROM stdin WITH PARSER ExampleDelimitedParser();
0
1
2
3
4
5
6
7
8
9
\.	

Chunker Implementation

This chunker supports apportioned load. The alignPortion() method finds the beginning of the first complete record in the current portion and aligns the input buffer with it. The record terminator is passed as an argument and set in the constructor.

StreamState ExampleDelimitedUDChunker::alignPortion(
            ServerInterface &srvInterface, 
            DataBuffer &input, InputState state)
{
    /* find the first record terminator.  Its record belongs to the previous portion */
    void *buf = reinterpret_cast<void *>(input.buf + input.offset);
    void *term = memchr(buf, recordTerminator, input.size - input.offset);

    if (term) {
        /* record boundary found.  Align to the start of the next record */
        const size_t chunkSize = reinterpret_cast<size_t>(term) - reinterpret_cast<size_t>(buf);
        input.offset += chunkSize
            + sizeof(char) /* length of record terminator */;

        /* input.offset points at the start of the first complete record in the portion */
        return DONE;
    } else if (state == END_OF_FILE || state == END_OF_PORTION) {
        return REJECT;
    } else {
        VIAssert(state == START_OF_PORTION || state == OK);
        return INPUT_NEEDED;
    }
}

The process() method has to account for chunks that span portion boundaries. If the previous call was at the end of a portion, the method set a flag. The code begins by checking for and handling that condition. The logic is similar to that of alignPortion(), so the example calls it to do part of the division.

StreamState ExampleDelimitedUDChunker::process(
                ServerInterface &srvInterface,
                DataBuffer &input,
                InputState input_state)
{
    const size_t termLen = 1;
    const char *terminator = &recordTerminator;

    if (pastPortion) {
        /*
         * Previous state was END_OF_PORTION, and the last chunk we will produce
         * extends beyond the portion we started with, into the next portion.
         * To be consistent with alignPortion(), that means finding the first
         * record boundary, and setting the chunk to be at that boundary.
         * Fortunately, this logic is identical to aligning the portion (with
         * some slight accounting for END_OF_FILE)!
         */
        const StreamState findLastTerminator = alignPortion(srvInterface, input);

        switch (findLastTerminator) {
            case DONE:
                return DONE;
            case INPUT_NEEDED:
                if (input_state == END_OF_FILE) {
                    /* there is no more input where we might find a record terminator */
                    input.offset = input.size;
                    return DONE;
                }
                return INPUT_NEEDED;
            default:
                VIAssert("Invalid return state from alignPortion()");
        }
        return findLastTerminator;
    }

Now the method looks for the delimiter. If the input began at the end of a portion, it sets the flag.

    size_t ret = input.offset, term_index = 0;
    for (size_t index = input.offset; index < input.size; ++index) {
        const char c = input.buf[index];
        if (c == terminator[term_index]) {
            ++term_index;
            if (term_index == termLen) {
                ret = index + 1;
                term_index = 0;
            }
            continue;
        } else if (term_index > 0) {
            index -= term_index;
        }

        term_index = 0;
    }

    if (input_state == END_OF_PORTION) {
        /*
         * Regardless of whether or not a record was found, the next chunk will extend
         * into the next portion.
         */
        pastPortion = true;
    }

Finally, process() moves the input offset and returns.

    // if we were able to find some rows, move the offset to point at the start of the next (potential) row, or end of block
    if (ret > input.offset) {
        input.offset = ret;
        return CHUNK_ALIGNED;
    }

    if (input_state == END_OF_FILE) {
        input.offset = input.size;
        return DONE;
    }

    return INPUT_NEEDED;
}

Factory Implementation

The file ExampleDelimitedParser.cpp defines a factory that uses this UDChunker. The chunker supports apportioned load, so the factory implements isChunkerApportionable():

    virtual bool isChunkerApportionable(ServerInterface &srvInterface) {
        ParamReader params = srvInterface.getParamReader();
        if (params.containsParameter("disable_chunker") && params.getBoolRef("d\
isable_chunker")) {
            return false;
        } else {
            return true;
        }
    }
		

The prepareChunker() method creates the chunker:

    virtual UDChunker* prepareChunker(ServerInterface &srvInterface,
                                      PerColumnParamReader &perColumnParamReade\
r,
                                      PlanContext &planCtxt,
                                      const SizedColumnTypes &returnType)
    {
        ParamReader params = srvInterface.getParamReader();
        if (params.containsParameter("disable_chunker") && params.getBoolRef("d\
isable_chunker")) {
            return NULL;
        }

        std::string recordTerminator("\n");

        ParamReader args(srvInterface.getParamReader());
        if (args.containsParameter("record_terminator")) {
            recordTerminator = args.getStringRef("record_terminator").str();
        }

        return vt_createFuncObject<ExampleDelimitedUDChunker>(srvInterface.allo\
cator,
                recordTerminator[0]);
    }