DataLakeBlog

I had the need to process some data from NOAA in Data Lake Analytics. The data is related to historical weather and I discovered that the data format is old-school - fixed field width and no delimiter style. As I have now realized, the built-in data extractors are limited to the following:

  • CSV - comma separated values
  • TSV - tab separated values
  • Text - general purpose delimited text

Unfortunately, the format of the data file did not match any of these - checkout the table below which describes the format of each row in the data file:

VariableColumnsType
ID1-11Character
LATITUDE13-20Real
LONGITUDE22-30Real
ELEVATION32-37Real
STATE39-40Character
NAME42-71Character
GSNFLAG73-75Character
HCNFLAG77-79Character
WMOID81-85Character
METHOD*87-99Character

A quick search for a fixed-column extractor turned up a blog article from Bryan C Smith that creates an extractor for standard fixed-width columns, but I would have had to think (shudder) and change the range of 1-11 characters (inclusive) and convert them into a length that included a space between fields, etc. As I wanted to limit the likelihood of introducing an error, I decided to create another extractor that could handle the column definitions that are provided. In a nutshell I wanted to be able to specify U-SQL similar to:

@allStations =
    EXTRACT 
        id string,
        latitude double,
        longitude double,
        elevation double,
        state string,
        name string
    FROM 
        @AllStationsInputPath
    USING new StartEndColumnExtractor(
          new List<ColumnDefinition>{
              new ColumnDefinition{ Name = "id", Start = 1, End = 11 },
              new ColumnDefinition{ Name = "latitude", Start = 13, End = 20 },
              new ColumnDefinition{ Name = "longitude", Start = 22, End = 30 },
              new ColumnDefinition{ Name = "elevation", Start = 32, End = 37 },
              new ColumnDefinition{ Name = "state", Start = 39, End = 40 },
              new ColumnDefinition{ Name = "name", Start = 42, End = 71 },
          },
          rowDelimiter : "\n" //unix line ending
    );
  

So, in a similar process to that documented by Bryan C Smith:

  1. I installed the Azure Data Lake Tools for Visual Studio.

  2. I created a Class Library (For U-SQL Application).

  3. I added a class that would represent a column definition:

    public class ColumnDefinition
    {
        public string Name { get; set; }
        public int Start { get; set; }
        public int End { get; set; }
    }
          
  4. I added a class that would implement the IExtractor interface:

    //
    // StartEndColumnExtractor
    // Inspired by an article by Bryan C Smith
    // https://blogs.msdn.microsoft.com/data_otaku/2016/10/27/a-fixed-width-extractor-for-azure-data-lake-analytics/'
    //
    
    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Text;
    using Microsoft.Analytics.Interfaces;
    
    namespace CustomMayd.DataLake.Extractors
    {
        [SqlUserDefinedExtractor]
        public class StartEndColumnExtractor : IExtractor
        {
            private readonly List<ColumnDefinition> _columnDefinitions;
            private readonly Encoding _encoding;
            private readonly byte[] _rowDelimiter;
            private readonly bool _startsAtZeroPosition;
    
            public StartEndColumnExtractor(List<ColumnDefinition> columnDefinitions, bool startsAtZeroPosition = false,
                Encoding encoding = null, string rowDelimiter = "\r\n")
            {
                _columnDefinitions = columnDefinitions;
                _startsAtZeroPosition = startsAtZeroPosition;
                _encoding = encoding ?? Encoding.UTF8;
                _rowDelimiter = _encoding.GetBytes(rowDelimiter);
            }
    
            public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow output)
            {
                foreach (var currentLine in input.Split(_rowDelimiter))
                    using (var lineReader = new StreamReader(currentLine, _encoding))
                    {
                        var line = lineReader.ReadToEnd();
    
                        for (var index = 0; index < _columnDefinitions.Count; index++)
                        {
                            var columnDefinition = _columnDefinitions[index];
                            var startPosition = columnDefinition.Start - (_startsAtZeroPosition ? 0 : 1);
                            var columnWidth = columnDefinition.End - (columnDefinition.Start - 1);
                            var value = line.Substring(startPosition, columnWidth);
                            switch (output.Schema[index].Type.Name)
                            {
                                case "String":
                                    output.Set(index, value.Trim());
                                    break;
                                case "Int32":
                                    output.Set(index, int.Parse(value));
                                    break;
                                case "Double":
                                    output.Set(index, double.Parse(value));
                                    break;
                                case "Float":
                                    output.Set(index, float.Parse(value));
                                    break;
                                case "DateTime":
                                    output.Set(index, DateTime.Parse(value));
                                    break;
                                default:
                                    throw new Exception($"Unknown data type specified: {output.Schema[index].Type.Name}");
                            }
                        }
    
                        yield return output.AsReadOnly();
                    }
            }
        }
    }
          
  5. I compiled the class library and noted the location of the output DLL - (I compiled it in Release mode).

  6. I prefer to have a dedicated U-SQL database where I register my assemblies rather than using the Master database, so I created a DB in U-SQL:

    CREATE DATABASE IF NOT EXISTS WeatherDataDb;
          
  7. To register the assemblies, I use the Data Lake Analytics Explorer view in Visual Studio to connect to my Azure Subscription and:

    • Expand the Data Lake Analytics node
    • Expand the node for my Azure subscription (or the (Local) node if you want to deploy locally)
    • Expand the U-SQL Databases node
    • Right-click on the Assemblies node and select Register Assembly from the context menu
    • Click the ... button to the right of the Choose assembly path field to display the Load Assembly dialog.
    • Ensure Local is selected and click ... to display the Open dialog.
    • Browse to and select the DLL you compiled earlier, then close the Open dialog.
    • On the Load Assembly dialog you should see the Local Path: field is populated - click OK.
    • On the Assembly Registration dialog, you should see the Load assembly from path and the Assembly Name fields are populate - click Submit.

    A job will be submitted to Data Lake that will register the assembly. The Output pane should tell you the job was successful.

  8. Now that the assembly has been registered, the new extractor can be used in a new U-SQL job:

    REFERENCE ASSEMBLY WeatherDataDb.[CustomMayd.DataLake.Extractors];
    
    USING CustomMayd.DataLake.Extractors;
    
    DECLARE @AllStationsInputPath string = "/noaaData/allstations.txt";
    
    DECLARE @OutputFile string = "/output/noaaStations.csv";
    
     @allStations =
        EXTRACT
            id string,
            latitude double,
            longitude double,
            elevation double,
            state string,
            name string
        FROM
            @AllStationsInputPath
        USING new StartEndColumnExtractor(
              new List<ColumnDefinition>{
                  new ColumnDefinition{ Name = "id", Start = 1, End = 11 },
                  new ColumnDefinition{ Name = "latitude", Start = 13, End = 20 },
                  new ColumnDefinition{ Name = "longitude", Start = 22, End = 30 },
                  new ColumnDefinition{ Name = "elevation", Start = 32, End = 37 },
                  new ColumnDefinition{ Name = "state", Start = 39, End = 40 },
                  new ColumnDefinition{ Name = "name", Start = 42, End = 71 },
              },
              rowDelimiter : "\n" //unix line ending
        );
    
    
    OUTPUT @allStations
    TO @OutputFile
    USING Outputters.Csv(outputHeader : true);
          

Submitting and running that jobs reads:

AQC00914000 -14.3167 -170.7667  408.4 AS AASUFOU                                     
AQW00061705 -14.3306 -170.7136    3.7 AS PAGO PAGO WSO AP               GSN     91765
CAW00064757  44.2325  -79.7811  246.0 ON EGBERT 1 W                                  
CQC00914080  15.2136  145.7497  252.1 MP CAPITOL HILL 1      

And creates:

"id","latitude","longitude","elevation","state","name"
"AQC00914000",-14.3167,-170.7667,408.4,"AS","AASUFOU"
"AQW00061705",-14.3306,-170.7136,3.7,"AS","PAGO PAGO WSO AP"
"CAW00064757",44.2325,-79.7811,246,"ON","EGBERT 1 W"
"CQC00914080",15.2136,145.7497,252.1,"MP","CAPITOL HILL 1"

Of course, once you have extracted the content you can perform all the usual DLA magic - I'm just demonstrating the simplest case here of exporting to a CSV.

Thanks to Bryan for writing the article that sent me in the correct direction - I have added other useful resources below.

Resources

Comments


Comments are closed