Streaming From the Client Via VerticaCopyStream
The VerticaCopyStream
class lets you stream data from the client system to a Vertica database. It lets you use the SQL COPY statement directly without having to copy the data to a host in the database cluster first by substituting one or more data stream(s) for STDIN.
Notes:
- Use Transactions and disable auto commit on the copy command for better performance.
- Disable auto commit using the copy command with the 'no commit' modifier. You must explicitly disable commits. Enabling transactions does not disable autocommit when using VerticaCopyStream.
- The copy command used with VerticaCopyStream uses copy syntax.
- VerticaCopyStream.rejects is zeroed every time execute is called. If you want to capture the number of rejects, assign the value of VerticaCopyStream.rejects to another variable before calling execute again.
- You can add multiple streams using multiple AddStream() calls.
Example usage:
The following example demonstrates using VerticaCopyStream to copy a file stream into Vertica.
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Data; using System.IO; using Vertica.Data.VerticaClient; namespace ConsoleApplication { class Program { static void Main(string[] args) { // Configure connection properties VerticaConnectionStringBuilder builder = new VerticaConnectionStringBuilder(); builder.Host = "192.168.1.10"; builder.Database = "VMart"; builder.User = "dbadmin"; //open the connection VerticaConnection _conn = new VerticaConnection(builder.ToString()); _conn.Open(); try { using (_conn) { // Start a transaction VerticaTransaction txn = _conn.BeginTransaction(); // Create a table for this example VerticaCommand command = new VerticaCommand("DROP TABLE IF EXISTS copy_table", _conn); command.ExecuteNonQuery(); command.CommandText = "CREATE TABLE copy_table (Last_Name char(50), " + "First_Name char(50),Email char(50), " + "Phone_Number char(15))"; command.ExecuteNonQuery(); // Create a new filestream from the data file string filename = "C:/customers.txt"; Console.WriteLine("\n\nLoading File: " + filename); FileStream inputfile = File.OpenRead(filename); // Define the copy command string copy = "copy copy_table from stdin record terminator E'\n' delimiter '|'" + " enforcelength " + " no commit"; // Create a new copy stream instance with the connection and copy statement VerticaCopyStream vcs = new VerticaCopyStream(_conn, copy); // Start the VerticaCopyStream process vcs.Start(); // Add the file stream vcs.AddStream(inputfile, false); // Execute the copy vcs.Execute(); // Finish stream and write out the list of inserted and rejected rows long rowsInserted = vcs.Finish(); IList<long> rowsRejected = vcs.Rejects; // Does not work when rejected or exceptions defined Console.WriteLine("Number of Rows inserted: " + rowsInserted); Console.WriteLine("Number of Rows rejected: " + rowsRejected.Count); if (rowsRejected.Count > 0) { for (int i = 0; i < rowsRejected.Count; i++) { Console.WriteLine("Rejected row #{0} is row {1}", i, rowsRejected[i]); } } // Commit the changes txn.Commit(); } } catch (Exception e) { Console.WriteLine(e.Message); } //close the connection _conn.Close(); } } }