Parser Example: Numeric Text
This NumericTextParser
example parses integer values spelled out in words rather than digits (for example "one two three" for one-hundred twenty three). The parser:
- Accepts a single parameter to set the character that separates columns in a row of data. The separator defaults to the pipe (|) character.
- Ignores extra spaces and the capitalization of the words used to spell out the digits.
- Recognizes the digits using the following words: zero, one, two, three, four, five, six, seven, eight, nine.
- Assumes that the words spelling out an integer are separated by at least one space.
- Rejects any row of data that cannot be completely parsed into integers.
- Generates an error, if the output table has a non-integer column.
Loading and Using the Example
Load and use the parser as follows:
=> CREATE LIBRARY JavaLib AS '/home/dbadmin/JavaLib.jar' LANGUAGE 'JAVA'; CREATE LIBRARY => CREATE PARSER NumericTextParser AS LANGUAGE 'java' -> NAME 'com.myCompany.UDParser.NumericTextParserFactory' -> LIBRARY JavaLib; CREATE PARSER FUNCTION => CREATE TABLE t (i INTEGER); CREATE TABLE => COPY t FROM STDIN WITH PARSER NumericTextParser(); Enter data to be copied followed by a newline. End with a backslash and a period on a line by itself. >> One >> Two >> One Two Three >> \. => SELECT * FROM t ORDER BY i; i ----- 1 2 123 (3 rows) => DROP TABLE t; DROP TABLE => -- Parse multi-column input => CREATE TABLE t (i INTEGER, j INTEGER); CREATE TABLE => COPY t FROM stdin WITH PARSER NumericTextParser(); Enter data to be copied followed by a newline. End with a backslash and a period on a line by itself. >> One | Two >> Two | Three >> One Two Three | four Five Six >> \. => SELECT * FROM t ORDER BY i; i | j -----+----- 1 | 2 2 | 3 123 | 456 (3 rows) => TRUNCATE TABLE t; TRUNCATE TABLE => -- Use alternate separator character => COPY t FROM STDIN WITH PARSER NumericTextParser(separator='*'); Enter data to be copied followed by a newline. End with a backslash and a period on a line by itself. >> Five * Six >> seven * eight >> nine * one zero >> \. => SELECT * FROM t ORDER BY i; i | j ---+---- 5 | 6 7 | 8 9 | 10 (3 rows) => TRUNCATE TABLE t; TRUNCATE TABLE => -- Rows containing data that does not parse into digits is rejected. => DROP TABLE t; DROP TABLE => CREATE TABLE t (i INTEGER); CREATE TABLE => COPY t FROM STDIN WITH PARSER NumericTextParser(); Enter data to be copied followed by a newline. End with a backslash and a period on a line by itself. >> One Zero Zero >> Two Zero Zero >> Three Zed Zed >> Four Zero Zero >> Five Zed Zed >> \. SELECT * FROM t ORDER BY i; i ----- 100 200 400 (3 rows) => -- Generate an error by trying to copy into a table with a non-integer column => DROP TABLE t; DROP TABLE => CREATE TABLE t (i INTEGER, j VARCHAR); CREATE TABLE => COPY t FROM STDIN WITH PARSER NumericTextParser(); vsql:UDParse.sql:94: ERROR 3399: Failure in UDx RPC call InvokeGetReturnTypeParser(): Error in User Defined Object [NumericTextParser], error code: 0 com.vertica.sdk.UdfException: Column 2 of output table is not an Int at com.myCompany.UDParser.NumericTextParserFactory.getParserReturnType (NumericTextParserFactory.java:70) at com.vertica.udxfence.UDxExecContext.getReturnTypeParser( UDxExecContext.java:1464) at com.vertica.udxfence.UDxExecContext.getReturnTypeParser( UDxExecContext.java:768) at com.vertica.udxfence.UDxExecContext.run(UDxExecContext.java:236) at java.lang.Thread.run(Thread.java:662)
Parser Implementation
The following code implements the parser.
package com.myCompany.UDParser; import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; import com.vertica.sdk.DataBuffer; import com.vertica.sdk.DestroyInvocation; import com.vertica.sdk.RejectedRecord; import com.vertica.sdk.ServerInterface; import com.vertica.sdk.State.InputState; import com.vertica.sdk.State.StreamState; import com.vertica.sdk.StreamWriter; import com.vertica.sdk.UDParser; import com.vertica.sdk.UdfException; public class NumericTextParser extends UDParser { private String separator; // Holds column separator character // List of strings that we accept as digits. private List<String> numbers = Arrays.asList("zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine"); // Hold information about the last rejected row. private String rejectedReason; private String rejectedRow; // Constructor gets the separator character from the Factory's prepare() // method. public NumericTextParser(String sepparam) { super(); this.separator = sepparam; } // Called to perform the actual work of parsing. Gets a buffer of bytes // to turn into tuples. @Override public StreamState process(ServerInterface srvInterface, DataBuffer input, InputState input_state) throws UdfException, DestroyInvocation { int i=input.offset; // Current position in the input buffer // Flag to indicate whether we just found the end of a row. boolean lastCharNewline = false; // Buffer to hold the row of data being read. StringBuffer line = new StringBuffer(); //Continue reading until end of buffer. for(; i < input.buf.length; i++){ // Loop through input until we find a linebreak: marks end of row char inchar = (char) input.buf[i]; // Note that this isn't a robust way to find rows. It should // accept a user-defined row separator. Also, the following // assumes ASCII line break metheods, which isn't a good idea // in the UTF world. But it is good enough for this simple example. if (inchar != '\n' && inchar != '\r') { // Keep adding to a line buffer until a full row of data is read line.append(inchar); lastCharNewline = false; // Last character not a new line } else { // Found a line break. Process the row. lastCharNewline = true; // indicate we got a complete row // Update the position in the input buffer. This is updated // whether the row is successfully processed or not. input.offset = i+1; // Call procesRow to extract values and write tuples to the // output. Returns false if there was an error. if (!processRow(line)) { // Processing row failed. Save bad row to rejectedRow field // and return to caller indicating a rejected row. rejectedRow = line.toString(); // Update position where we processed the data. return StreamState.REJECT; } line.delete(0, line.length()); // clear row buffer } } // At this point, process() has finished processing the input buffer. // There are two possibilities: need to get more data // from the input stream to finish processing, or there is // no more data to process. If at the end of the input stream and // the row was not terminated by a linefeed, it may need // to process the last row. if (input_state == InputState.END_OF_FILE && lastCharNewline) { // End of input and it ended on a newline. Nothing more to do return StreamState.DONE; } else if (input_state == InputState.END_OF_FILE && !lastCharNewline) { // At end of input stream but didn't get a final newline. Need to // process the final row that was read in, then exit for good. if (line.length() == 0) { // Nothing to process. Done parsing. return StreamState.DONE; } // Need to parse the last row, not terminated by a linefeed. This // can occur if the file being read didn't have a final line break. if (processRow(line)) { return StreamState.DONE; } else { // Processing last row failed. Save bad row to rejectedRow field // and return to caller indicating a rejected row. rejectedRow = line.toString(); // Tell Vertica the entire buffer was processed so it won't // call again to have the line processed. input.offset = input.buf.length; return StreamState.REJECT; } } else { // Stream is not fully read, so tell Vertica to send more. If // process() didn't get a complete row before it hit the end of the // input buffer, it will end up re-processing that segment again // when more data is added to the buffer. return StreamState.INPUT_NEEDED; } } // Breaks a row into columns, then parses the content of the // columns. Returns false if there was an error parsing the // row, in which case it sets the rejected row to the input // line. Returns true if the row was successfully read. private boolean processRow(StringBuffer line) throws UdfException, DestroyInvocation { String[] columns = line.toString().split(Pattern.quote(separator)); // Loop through the columns, decoding their contents for (int col = 0; col < columns.length; col++) { // Call decodeColumn to extract value from this column Integer colval = decodeColumn(columns[col]); if (colval == null) { // Could not parse one of the columns. Indicate row should // be rejected. return false; } // Column parsed OK. Write it to the output. writer is a field // provided by the parent class. Since this parser only accepts // integers, there is no need to verify that data type of the parsed // data matches the data type of the column being written. In your // UDParsers, you may want to perform this verification. writer.setLong(col,colval); } // Done with the row of data. Advance output to next row. // Note that this example does not verify that all of the output columns // have values assigned to them. If there are missing values at the // end of a row, they get automatically get assigned a default value // (0 for integers). This isn't a robust solution. Your UDParser // should perform checks here to handle this situation and set values // (such as null) when appropriate. writer.next(); return true; // Successfully processed the row. } // Gets a string with text numerals, i.e. "One Two Five Seven" and turns // it into an integer value, i.e. 1257. Returns null if the string could not // be parsed completely into numbers. private Integer decodeColumn(String text) { int value = 0; // Hold the value being parsed. // Split string into individual words. Eat extra spaces. String[] words = text.toLowerCase().trim().split("\\s+"); // Loop through the words, matching them against the list of // digit strings. for (int i = 0; i < words.length; i++) { if (numbers.contains(words[i])) { // Matched a digit. Add the it to the value. int digit = numbers.indexOf(words[i]); value = (value * 10) + digit; } else { // The string didn't match one of the accepted string values // for digits. Need to reject the row. Set the rejected // reason string here so it can be incorporated into the // rejected reason object. // // Note that this example does not handle null column values. // In most cases, you want to differentiate between an // unparseable column value and a missing piece of input // data. This example just rejects the row if there is a missing // column value. rejectedReason = String.format( "Could not parse '%s' into a digit",words[i]); return null; } } return value; } // Vertica calls this method if the parser rejected a row of data // to find out what went wrong and add to the proper logs. Just gathers // information stored in fields and returns it in an object. @Override public RejectedRecord getRejectedRecord() throws UdfException { return new RejectedRecord(rejectedReason,rejectedRow.toCharArray(), rejectedRow.length(), "\n"); } }
ParserFactory Implementation
The following code implements the parser factory.
NumericTextParser
accepts a single optional parameter named separator
. This parameter is defined in the getParameterType()
method, and the plan()
method stores its value. NumericTextParser
outputs only integer values. Therefore, if the output table contains a column whose data type is not integer, the getParserReturnType()
method throws an exception.
package com.myCompany.UDParser; import java.util.regex.Pattern; import com.vertica.sdk.ParamReader; import com.vertica.sdk.ParamWriter; import com.vertica.sdk.ParserFactory; import com.vertica.sdk.PerColumnParamReader; import com.vertica.sdk.PlanContext; import com.vertica.sdk.ServerInterface; import com.vertica.sdk.SizedColumnTypes; import com.vertica.sdk.UDParser; import com.vertica.sdk.UdfException; import com.vertica.sdk.VerticaType; public class NumericTextParserFactory extends ParserFactory { // Called once on the initiator host to check the parameters and set up the // context data that hosts performing processing will need later. @Override public void plan(ServerInterface srvInterface, PerColumnParamReader perColumnParamReader, PlanContext planCtxt) { String separator = "|"; // assume separator is pipe character // See if a parameter was given for column separator ParamReader args = srvInterface.getParamReader(); if (args.containsParameter("separator")) { separator = args.getString("separator"); if (separator.length() > 1) { throw new UdfException(0, "Separator parameter must be a single character"); } if (Pattern.quote(separator).matches("[a-zA-Z]")) { throw new UdfException(0, "Separator parameter cannot be a letter"); } } // Save separator character in the Plan Data ParamWriter context = planCtxt.getWriter(); context.setString("separator", separator); } // Define the data types of the output table that the parser will return. // Mainly, this just ensures that all of the columns in the table which // is the target of the data load are integer. @Override public void getParserReturnType(ServerInterface srvInterface, PerColumnParamReader perColumnParamReader, PlanContext planCtxt, SizedColumnTypes argTypes, SizedColumnTypes returnType) { // Get access to the output table's columns for (int i = 0; i < argTypes.getColumnCount(); i++ ) { if (argTypes.getColumnType(i).isInt()) { // Column is integer... add it to the output returnType.addInt(argTypes.getColumnName(i)); } else { // Column isn't an int, so throw an exception. // Technically, not necessary since the // UDx framework will automatically error out when it sees a // Discrepancy between the type in the target table and the // types declared by this method. Throwing this exception will // provide a clearer error message to the user. String message = String.format( "Column %d of output table is not an Int", i + 1); throw new UdfException(0, message); } } } // Instantiate the UDParser subclass named NumericTextParser. Passes the // separator characetr as a paramter to the constructor. @Override public UDParser prepare(ServerInterface srvInterface, PerColumnParamReader perColumnParamReader, PlanContext planCtxt, SizedColumnTypes returnType) throws UdfException { // Get the separator character from the context String separator = planCtxt.getReader().getString("separator"); return new NumericTextParser(separator); } // Describe the parameters accepted by this parser. @Override public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes) { parameterTypes.addVarchar(1, "separator"); } }