Saturday, December 22, 2012

Python code to simplify loading data into SciDB

Summary:  I've written some Python code that simplifies the loading of data from a csv file into SciDB. The programmer specifies for each column in the csv file whether it should be an attribute or a dimension in the SciDB array, and then the code loads it as a raw array, creates the the destination array based on the provided specifications and the data loaded into raw, and then transfers the data from the raw array to the destination array.

I've added the code to this GitHub repository under the directory ScidbLoader:

TODO
  1. when calculating dimension chunk sizes, need to scale by the number of attributes - currently assumes 1 attribute

Some terminology:
  • csv file - the starting point for this process (can be generated by export from relational databases, spreadsheet software, etc.).  Each line of the file contains a row of data, separated by commas.  The first line of the file contains the name of each column of data.
  • scidb file - generated from the csv file, a text file that has slightly different formatting that is readable by SciDB
  • raw array - array that data is initially loaded into from scidb file
  • destination array - the array that the data ends up in at the end of the process.  This array is "fit for purpose" - has the desired structure, attributes, dimensions, etc.
The overall process to load data into SciDB looks something like this:
  1. Generate csv file containing the data to be loaded
  2. convert the csv file into a scidb file using csv2scidb
  3. load it into a raw array in Scidb
    • each column in the csv file, whether it is ultimately an attribute or dimension, is an attribute in the raw array
    • choose the data type for each csv column that matches what data type you want it to be in the destination array
    • the dimension for the raw array is the line number from the file
  4. create the destination array
    • choose which of the columns in the original csv file are dimensions and which are attributes
    • based on the number of attributes, the size of each dimension, and the intended use, create dimension chunk sizes such that each chunk contains ~1,000,000
    • I create the destination array after loading the data in case I want transform the "raw" data - e.g. subtracting an offset so that dimension indexes start at 0.  I can use SciDB to query for the raw data as I design the destination array
  5. Transfer from the raw array to the destination array
I've attempted to automate the above in some Python classes which I'll describe.  First, a walk through the abridged test code / example usage (ScidbLoader/scidb_loader/test/test_scidb_loader.py):  



import sys
sys.path.append('/opt/scidb/12.10/lib/') 
import scidbapi
import scidb_load.scidb_loader as scidb_loader
import unittest
import scidb_load.scidb_load_column as scidb_load_column
import commands

class TestScidbLoader(unittest.TestCase):
    def test_load(self):
        array_name = "testScidbLoaderArray"

        scidb = scidbapi.connect("localhost", 1239)
        
        first_dimension = "a"
        second_dimension = "b"
        attribute = "c"

        #create the list of columns to be loaded
        scidb_load_column_list = [scidb_load_column.ScidbLoadColumn(first_dimension, "int64", False),
                                  scidb_load_column.ScidbLoadColumn(second_dimension, "int64", False),
                                  scidb_load_column.ScidbLoadColumn(attribute, "double", True)]

        loader = scidb_loader.ScidbLoader(scidb)
        
        #get the path to the csv holding the test data
        csv_file = "".join([commands.getoutput("pwd"), "/test_data.csv"])
        
        #load the data from the csv file into the database
        loader.load(scidb_load_column_list, csv_file, array_name)

        scidb.disconnect()



Note I've removed the actual testing from the above to highlight the key points that are relevant to using ScidbLoader.  In this example, we are loading data that we intend to have 2 dimensions (both int64 data type) and one attribute (data type double).

Key to the above usage is the ScidbLoadColumn class, where the name, data type and whether or not it is an attribute is stored.  Included in the class but not used in the above are the dimension parameters - used to store the min value, max value, chunk size and overlap size if the column is a dimension.

Here is the code of the ScidbLoader class that walks through the above described process (ScidbLoader/scidb_load/scidb_loader.py):


import csv_to_scidb
import _utils_scidb_load

class ScidbLoader:
    __target_chunk_size = 1000000
    __raw_array = "raw"

    __scidb = None

    __do_calculate_dimension_params = True

    _utils = None

    def __init__(self, scidb):
        self.__scidb = scidb
        self._utils = _utils_scidb_load.UtilsScidbLoad(self.__scidb)


    def load(self, scidb_load_col_list, csv_file, array_name):
        """load data from the indicated csv file (csv_file) into an array in SciDB
        named array_name with the attributes and dimensions specified by
        scidb_load_col_list (list of ScidbLoadColumn).  Will attempt to build
        chunks that have a a size of {}.
        TODO:  need to divide the target chunk size by the number of attributes
        csv_file is assumed to have a header row
        """.format(self.__target_chunk_size)
     
        print "generate scidb file from csv file"
        scidb_file = csv_to_scidb.convert_csv_to_scidb(csv_file)

        self._utils.remove_array_if_present(self.__raw_array)
     
        raw_attr_list = [scidb_load_col.as_attribute() for scidb_load_col in scidb_load_col_list]

        raw_attr_query = "create array {} <{}> [line=0:*,1000000,0]".format(self.__raw_array, ", ".join(raw_attr_list))

        result = self.__scidb.executeQuery(raw_attr_query)
        self.__scidb.completeQuery(result.queryID)

        print "load data into raw in scidb"
        result = self.__scidb.executeQuery("load ({}, '{}')".format(self.__raw_array, scidb_file))
        self.__scidb.completeQuery(result.queryID)

        if (self.__do_calculate_dimension_params):
            self._calculate_dimensions(scidb_load_col_list)

        attribute_string = ", ".join([attr_col.as_attribute() for attr_col in
                                      filter(lambda col: col.is_attribute, scidb_load_col_list)])
        #print attribute_string
        dimension_string = ", ".join([dim_col.as_dimension() for dim_col in
                                      filter(lambda col: not col.is_attribute, scidb_load_col_list)])
        #print dimension_string

        create_query = "create array {} <{}> [{}]".format(array_name,
                                                          attribute_string, dimension_string)
        #print create_query

        result = self.__scidb.executeQuery(create_query)
        self.__scidb.completeQuery(result.queryID)

        print "redimension_store"
        result = self.__scidb.executeQuery("redimension_store({}, {})"
                                           .format(self.__raw_array, array_name))
        self.__scidb.completeQuery(result.queryID)

Note that I have not included the internal methods in this post (which is where a lot of the "fun" of calculating dimension chunk sizes occurs).



No comments:

Post a Comment