Go back home

Share this:
facebook twitter reddit linkedin email

Ingesting 2 Billion New York City Taxi rides into Kusto (Azure Data Explorer)

Last modified: 03/06/2019

The NYC Taxi & Limousine Commission makes historical data about taxi trips and for-hire-vehicle trips (such as Uber, Lyft, Juno, Via, etc.) available for anyone to download and analyze. These records capture pick-up and drop-off dates/times, pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts.

Todd W. Schneider took this data to the next level, and in his nyc-taxi-data GitHub repo made it easy to import the data into PostgreSQL, and then use PostGIS for doing spatial calculations over it.

At the time of writing this post (February 2019), the available data set included:

This is a fair amount of records, and for getting it ingested and analyzed quickly, I made the natural choice of using Kusto (Azure Data Explorer). After preparing the data set in PostgreSQL, I easily exported it to blobs in CSV format, and made it available for Kusto to consume.

This post covers ingestion of the data into Kusto, while another post covers analyzing the data, post-ingestion.

Ingesting 1.55 Billion Taxi Trips

Preparing the data set

To prepare this data set, I mostly followed the instructions by Todd W. Schneider on his nyc-taxi-data GitHub repo. As the process is a little tricky and time consuming (using PostgreSQL on a single virtual machine), I’ve included a section with a few tips at the bottom of this post: Appendix: Tips for preparing the Yellow/Green Taxi trips data set.

Update: the enriched data set I used is now available in a public Azure blob storage container: https://kustosamplefiles.blob.core.windows.net/taxirides

Ingesting the files from Azure blob storage

Once the data set was prepared in Azure blob storage, the easy part was getting it into Kusto. First, I created the table with a schema which matches the data I exported from PostgreSQL:

.create table Trips (
    trip_id:long,
    vendor_id:string,
    pickup_datetime:datetime,
    dropoff_datetime:datetime,
    store_and_fwd_flag:string,
    rate_code_id:int,
    pickup_longitude:real,
    pickup_latitude:real,
    dropoff_longitude:real,
    dropoff_latitude:real,
    passenger_count:int,
    trip_distance:real,
    fare_amount:real,
    extra:real,
    mta_tax:real,
    tip_amount:real,
    tolls_amount:real,
    ehail_fee:real,
    improvement_surcharge:real,
    total_amount:real,
    payment_type:string,
    trip_type:int,
    pickup:string,
    dropoff:string,
    cab_type:string,
    precipitation:int,
    snow_depth:int,
    snowfall:int,
    max_temperature:int,
    min_temperature:int,
    average_wind_speed:int,
    pickup_nyct2010_gid:int,
    pickup_ctlabel:string,
    pickup_borocode:int,
    pickup_boroname:string,
    pickup_ct2010:string,
    pickup_boroct2010:string,
    pickup_cdeligibil:string,
    pickup_ntacode:string,
    pickup_ntaname:string,
    pickup_puma:string,
    dropoff_nyct2010_gid:int,
    dropoff_ctlabel:string,
    dropoff_borocode:int,
    dropoff_boroname:string,
    dropoff_ct2010:string,
    dropoff_boroct2010:string,
    dropoff_cdeligibil:string,
    dropoff_ntacode:string,
    dropoff_ntaname:string,
    dropoff_puma:string
)

For ingestion, I chose using LightIngest - a simple command line utility I find very useful and simple to use, if you want to some ad-hoc ingestion.

All I need to know is:

And then I run the command:

LightIngest.exe
   https://ingest-<myclustername>.<region>.kusto.windows.net;fed=true
   -database:TaxiRides
   -table:Trips
   -source:https://kustosamplefiles.blob.core.windows.net/taxirides
   -pattern:*.csv.gz
   -format:csv

Measuring ingestion duration

On the client side, this runs in a matter of seconds, as it only queues the files for asynchronous ingestion (read more here).

How long did it take the service to ingest these 1548 files with 1.55 billion records?

I ran this with 2 different configurations, to demonstrate Kusto’s ability to scale its ingestion capacity, depending on the number and kind of nods the cluster has.

I ran the ingestion of the same set of blobs twice, while changing the number of the instances my cluster had) in between:

Then, I can simply ask the service, using either of the following options, how long it took for each case:

Using the .show commands command:

.show commands 
| where StartedOn > datetime(2019-02-04 06:00)
| where CommandType == "DataIngestPull"
| where Text contains 'into Trips'
| parse Text with * "into " TableName " (" *
| extend ClusterSize = case(TableName == "Trips", "2xD14_v2",
                            TableName == "Trips2", "6xD14_v2",
                            "N/A")
| summarize ['# Commands'] = count(), 
            StartTime = min(StartedOn), 
            EndTime = max(LastUpdatedOn)
            by ClusterSize,
            CommandType, 
            State
| extend Duration = EndTime - StartTime
ClusterSize Duration CommandType State # Commands StartTime EndTime
2xD14_v2 00:47:33.2867817 DataIngestPull Completed 1417 2019-02-04 06:00:39.4265609 2019-02-04 06:48:12.7133426
6xD14_v2 00:20:25.5162013 DataIngestPull Completed 1415 2019-02-08 03:34:09.6342569 2019-02-08 03:54:35.1504582

Or, using the ingestion_time() function:

union withsource=TableName Trips, Trips2
| where pickup_datetime between(datetime(2009-01-01) .. datetime(2018-07-01))
| summarize 
    TotalTrips = count(),
    EarliestTrip = min(pickup_datetime),
    LatestTrip = max(pickup_datetime),
    IngestionDuration = max(ingestion_time()) - min(ingestion_time())
by TableName 
| extend ClusterSize = case(TableName == "Trips", "2xD14_v2",
                            TableName == "Trips2", "6xD14_v2",
                            "N/A")
| project-away TableName
ClusterSize IngestionDuration TotalTrips EarliestTrip LatestTrip
2xD14_v2 00:46:57.8493213 1547471140 2009-01-01 00:00:00.0000000 2018-07-01 00:00:00.0000000
6xD14_v2 00:19:54.1510651 1547471140 2009-01-01 00:00:00.0000000 2018-07-01 00:00:00.0000000

And as you can see, it took only 20 minutes, to ingest these 1,547,471,140 records, from 1548 source files, with 9.5 years’ worth of data. And they’re now fully indexed and ready to query.

Ingesting 0.5 Billion For-Hire-Vehicle Trips

To demonstrate how easy it is to use Kusto’s client libraries to ingest data in supported formats, I chose taking this data set, in CSV format (with this schema) and ingesting it using a Queue Ingestion client, which is available in Kusto’s .NET client library. Needless to say, that C# is just one of the languages in which the client libraries are available.

Looking at the NYC Taxi & Limousine Commission’s site, it’s easy to dynamically build a list of URLs for these CSV files, and have them ingested from Amazon S3, where they were made available.

For the purpose of this ingestion I used:

Based on the schema provided on the site, I created the following table in my database:

.create table FHV_Trips (
    dispatching_base_num:string,
    pickup_datetime:datetime,
    dropoff_datetime:datetime,
    pickup_location_id:int,
    dropoff_location_id:int,
    shared_ride_flag:bool
)

And here’s the simple application I wrote and ran:

public static void Main()
{
    var kustoConnectionStringBuilder = 
            new KustoConnectionStringBuilder(@"https://ingest-mycluster.region.kusto.windows.net")
            .WithAadApplicationKeyAuthentication(
                "<application id>",
                "<application key>",
                "<authority id>");

    var startTime = new DateTime(2015, 01, 01);
    var endTime = new DateTime(2018, 07, 01);
    var estimatedFileSize = (int)Math.Pow(10, 9);
    var ingestionProperties = new KustoIngestionProperties(databaseName: "TaxiRides", tableName: "FHV_Trips")
    {
         // I specify this because the provided source CSV files include a header line with the column names
        IgnoreFirstRecord = true
    };

    using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(kustoConnectionStringBuilder))
    {
        for (var dt = startTime; dt < endTime; dt = dt.AddMonths(1))
        {
            var uri = $"https://s3.amazonaws.com/nyc-tlc/trip+data/fhv_tripdata_{dt.ToString("yyyy-MM")}.csv";
            ingestClient.IngestFromSingleBlob(
                blobUri: uri,
                deleteSourceOnSuccess: false,
                ingestionProperties: ingestionProperties,
                rawDataSize: estimatedFileSize);
        }
    }
}

Measuring ingestion duration

On the client side, this runs in a matter of seconds, as it only queues the files for asynchronous ingestion (read more here).

How long did it take the service to ingest these 42 files with 0.5 billion records?

I can simply ask the service, using either of the following options:

Using the .show commands command:

.show commands 
| where StartedOn > datetime(2019-02-04 06:00)
| where CommandType == "DataIngestPull"
| where Text has 'ingest async into FHV_Trips'
| summarize ['# Commands'] = count(), 
            StartTime = min(StartedOn), 
            EndTime = max(LastUpdatedOn)
| extend Duration = EndTime - StartTime
Duration # Commands StartTime EndTime
00:02:35.0245767 21 2019-02-08 04:10:40.9281504 2019-02-08 04:13:15.9527271

Or, using the ingestion_time() function:

FHV_Trips
| where pickup_datetime between(datetime(2009-01-01) .. datetime(2018-07-01))
| summarize 
    TotalTrips = count(),
    EarliestTrip = min(pickup_datetime),
    LatestTrip = max(pickup_datetime),
    IngestionDuration = max(ingestion_time()) - min(ingestion_time())
IngestionDuration TotalTrips EarliestTrip LatestTrip
00:02:25.3214546 514304551 2015-01-01 00:00:00.0000000 2018-06-30 23:59:59.0000000

And as you can see, it took only 2.5 minutes, to ingest these 514,304,551 records, with 3.5 years’ worth of data. And they’re now fully indexed and ready to query.

Now that I have all of this data ingested, it’s time to start analyzing it.


Appendix: Tips for preparing the Yellow/Green Taxi trips data set

In case you’re going to perform this on an Azure VM, you may find the following tips useful.

Note: These are not related to Kusto (Azure Data Explorer), but they may help you get the data prepared, before ingesting it.


Go back home

Share this:
facebook twitter reddit linkedin email