Music Recommendations at Scale With Cloud Bigtable (Cloud Next ’19)
1 Comments


[MUSIC PLAYING] PETER SOBOT: So let me
just get started here. First off, who am I, and why
am I on stage in front of you? Well, my name’s Peter Sobot. I’m a senior engineer on
the personalization team at Spotify in New York City. You can find me on
Twitter @psobot, or tweet @SpotifyEng
about this talk. Feel free to take photos, and
hashtags, and all the things that kids do nowadays. So if you haven’t
heard of Spotify, we are one of the world’s
largest audio platforms. We’ve transformed the way that
people access and enjoy music around the globe. We proudly use Google
Cloud Platform to power the experiences that we
give to our 200-odd million monthly active users. We serve more than 40
million tracks to them across more than
79 active markets. We’re also, today talking
about specifically one Google power technology,
which is Google Cloud Bigtable. If you haven’t heard
of Bigtable before, Bigtable is a high
performance wide column store. You can think of it
very, very roughly as a key value store
that’s distributed across multiple machines, and
is very, very high performance and parallelizable. Technically, it’s a
sparse distributed multi-dimensional sorted map. Bigtable is a
non-relational database. Meaning that there’s no joins. It’s not like a SQL database,
and it allows for easy scaling by increasing the number
of nodes in the cluster. That’s essentially
the only dial that you get to turn with Bigtable,
is the size of your cluster. It’s scalable to petabytes,
and indeed, we’ve scaled it to
petabytes at Spotify. It uses wide columns, and
again, it’s non-relational. So it’s not like your MySQL
or your Postgres-styles of databases. And like I mentioned, we use
a lot of Google Cloud Bigtable at Spotify. Across the company, we have
hundreds of Bigtable clusters, thousands of Bigtable
nodes, and petabytes of data stored across all
of those Cloud Bigtable type of clusters. So lots and lots
of usage of Cloud Bigtable across everything that
you see in the Spotify app. Some good examples of that
are some of these personalized experiences that
my team works on. So personalized
via Cloud Bigtable are things like the home page. So if you open up Spotify on
your mobile app, the very first thing you see, personalized
content for you. Well, that view is powered
both at the surface level and down underneath
by Cloud Bigtable. We also power things
like Discover Weekly, our weekly playlist
of tracks that we think you are definitely
into like this week, by Bigtable both at
the surface layer– so serving the direct
tracks to your phone, as well as underneath to
power the recommendations. And another example is the
Recommended Songs View. So if you scroll to the
bottom of any playlist that you make on Spotify,
and see a list of songs that we’d recommend that you
add to that playlist because of similarity, that’s
also partially powered by Google Cloud Bigtable. So Bigtable really
kind of underlies a lot of the features
that we have on Spotify. If you’re not familiar
with Cloud Bigtable itself, it allows for very
expressive schema designs. So I mentioned before
that it’s non-relational, and the schema that it gives
to you is something like this. You can have a row-key,
and all of your data is indexed by this single key. So you don’t have multiple
indices or different ways to cut the data. You essentially have a key
that you can use for lookups, and then a value
that is this kind of multi-dimensional
object, here. You know multiple column
families within that row, multiple columns within each
of those column families, and multiple values
within each cell as well, and those values are
all sorted by timestamp. So you can even do
versioning in here, and have some sort of expiration
of these values as well, and this is all provided to you
by the very expressive Bigtable API. And I don’t want to get too
much into how Bigtable works under the hood because I’m
Spotify-er, not a Googler, but Bigtable– one very
important thing about how it works and how it
sorts its data is that the nodes in
your Bigtable cluster are compute only, essentially. So while you might be able to
increase the number of nodes that serve your cluster,
that gives you more capacity, but it doesn’t change
where the data is stored. The data in your cluster
is stored underlying in the Colossus
storage layer, there, which is a Google technology
that is not directly exposed in Google Cloud. This means that if you want
more capacity– if you want to add more requests per
second into your system, you can add more
nodes, and this simply reshards the data
across your nodes, and changes which data each
node is responsible for. So this doesn’t actually change
where the data is stored, it just changes which nodes
are responsible for which data. And this will come up
later on in the talk about how we can scale
Bigtable very effectively. Sp I’m going to talk
quickly about three different common Cloud
Bigtable usage patterns that we’ve seen in our usage
of Bigtable at Spotify. We have lots of
different products, as I mentioned
earlier, and they all use some variance of these
patterns that we’ve seen. The first pattern that we
see is using Cloud Bigtable as a real time data store. So using it to serve some data
in real time that might be computed offline or in batch. The second pattern
that we’ve seen is using Cloud Bigtable as a
caching layer, which allows us to– we’ll essentially use
it as a large distributed automatic cache. And the third
pattern is using it as a platform data store,
which takes a little bit more explanation. So I’ll wait until I get
there to talk about that. So first off, if we use Bigtable
as a real time data store, Bigtable allows us to
bridge two different worlds of data processing. We can do big batch offline
data processing that takes lots of
resources, lots of time, and lots of different
machines, and then dump the results of that data
processing into Cloud Bigtable to serve it in real time. This allows us to get
very low latency reads, and kind of write
this data all at once, and then read it multiple times
when different clients request this data. This allows Bigtable to kind
of bridge these two worlds, and do something we
couldn’t have done before. You can’t do this kind of
thing if you’re reading off of Google Cloud Storage, or
many other kinds of databases, but Bigtable really handles
this load very, very well. The input data for a lot
of our different products, for example, let’s say Discover
Weekly, might come in batch. We might train a model or do
some sort of recommendation product in Cloud Dataflow,
which is Google’s batch data processing framework. And then the results
of that computation end up being stored in
Bigtable by simply writing a huge amount of data
from Dataflow Workers into a Bigtable
cluster directly, and the data sizes depend
on the application here, but typically we tend to
use gigabytes to terabytes for the Bigtable size here. This works really
great for write once, read many use cases. So if you have a
lot of data that gets written once or
written infrequently, let’s say, again,
Discover Weekly. If you write that once a week,
and we want to read it multiple times– when users come back
to it and keep listening to the same playlist– Bigtable works really,
really great for this. And there’s some other
great features of Bigtable that allow this to be
even more maintenance free and effortless, but I’ll
talk about that in a second. Another use case that we
have for Cloud Bigtable is to use it as a caching layer. So rather than just
using the server data in real time, which it still is
in this graph here, sometimes we’ll need Bigtable to cache
data in a kind of distributed way so that systems
that are stateless can still use this
distributed cache to avoid having negative side
effects on other systems. So a good example
here is if we use Cloud Dataflow in
streaming mode, rather than in batch mode. Cloud Dataflow can be used
to process streams of data. So as more data
comes in, you can operate the same transforms
and the same operations on new data. And in this case, sometimes
we have Cloud Dataflow streams that need to reach
out to other services. They might need to talk to
back end services, or APIs, or things like that. And if we did so without
any sort of caching, this could very quickly
overwhelm these back end services. We could have a stream
of data coming in, and if we can imagine every
single element in that stream results in a request
to a back end service, it’s very easy to overload
that, and make that fall over. Cloud Bigtable works
really, really well here because we can make a request
to Bigtable as a cache first, and we run the risk of otherwise
breaking these back end services. So this distributive cache
adds this stateful layer around the stateless
Cloud Dataflow service. Again, in this application,
the cache sizes range from gigabytes
to terabytes, but it really depends
on the exact application being used here. All right, so our third use
case for Cloud Bigtable here, this is a common one
that my team works on. This is Cloud Bigtable
as a platform data store, and this takes a little
bit more explanation here. So we often train
multiple different types of machine learning
models, especially for music
recommendation purposes, and these models have
slightly different properties, or they’re different versions
of the same kind of thing. When we train these
models, we want to produce some sort
of intermediate data that we can use
for recommendation. An example of this
is we might embed some data into a
certain space, producing vectors in that space. And when we take all
this resulting data, it doesn’t make
sense on its own. We couldn’t serve
this data directly to clients or to
users, and say here, these are your recommendations. These are not recommendations. These are really
intermediate pieces of data that can be used
to produce recommendations, but they themselves are
not the recommendations. So what we use
Bigtable for here is as a platform data
store to store all of this intermediate
data for all of these different models
that we might be testing, or might be using for
different use cases. Bigtable works
really, really great here because we can scale
this as big as we need to. We can add more models
for experimentation. We can have bigger
models, smaller models. All this kind of stuff just
gets shoved into Bigtable, and then we wrap this
Bigtable in a companion back end service. So the screen box, right here. That companion back end
service can manage things like access control,
business logic, and caching so that
any consumers that want to use this
recommendations data can call the back end service,
and have one unified API for accessing this Bigtable
that stores all the underlying machine learning data. So this has been a very, very
successful project of ours to use Bigtable in
this manner, and this does drive a lot of our
recommendation systems, including some of the stuff
that I showed you earlier, like the homepage on Spotify. I mentioned a couple of
times that we do dump data into Dataflow– sorry, into
Bigtable from Cloud Dataflow, and that can be as easy as
these 10 lines of code here. So we have a framework
in-house and also open source called Scio. From the Latin “to learn,”
apparently, and Scio allows you to write very
small amounts of code in Scala, in what looks
like almost like a DSL, to read data from
certain places, map it, filter it, transform
it, and essentially build up an Apache Beam style
processing graph in Scala. With these 10 lines
of code here, we can read data from a
Google Cloud Storage bucket here, map that onto what we want
it to look like in Bigtable, and then with one single
line at the bottom, save as Bigtable there. We can dump all that data
into Bigtable all at once. So Scio is really, really useful
for bridging these two worlds, and doing complex
data processing, and it integrates super
nicely with Bigtable. I’ll talk a little bit more
about Scio later on as well, but definitely take a
look at this on GitHub if you’re interested. I also mentioned that Bigtable
has some very nice properties that make it good as a cache. One of those properties is using
its built-in garbage collection policy, or it’s built-in
expiration feature. Now, Bigtable allows
you to set a maximum age for your data, based on the
column family and the table that it lives in. This is directly from Google’s
developer documentation, but this max age
parameter says that maybe after seven days or 30
days, you want the data that you’ve written to
automatically expire from the cluster. And this means
that you don’t need to do any sort of
management of your cluster. You don’t need to have another
process read over the data and automatically
delete it for you. This happens in the
background, opportunistically, and you don’t need to waste any
CPU on it, or any of your mind, really. This just happens automatically. This is super, super useful, and
you can think of many use cases where if you have a write
once, read many application, this can just make your
maintenance headache go away. So I would definitely
recommend this. Another thing about using
Bigtable for these use cases is that although it does have a
very expressive schema design, it’s possible for you to use
all of its column families and other features
like that to express your data in its
language, we found that it can be useful
to treat Bigtable as a very, very simple map. So essentially, a
distributed key value store where you have one
key, and you have one value. So in this example,
we’ve kind of ignored the column qualifier
and the column family as well. We just have them
as single characters here, because honestly,
all we really need is one, big distributed
key value story. And if we have these column
qualifiers and column families as single characters, that even
makes our processing faster. We have to send those
column qualifiers on every single request. So keeping those as small as
possible is of interest to us. And we found that if we encode
our values as part of our JSON, as long as we don’t need to do
any filtering on those values, or ask Bigtable to process them
before it fetches them for us, this can be extremely
efficient, and also lends itself well to doing caching in
the application layer, and other similar things. So Bigtable, while it does
have a lot of features, can be used in this
extremely simplistic way to get really, really great
performance out of it. All right, so let’s
talk a little bit about scaling challenges. As I mentioned before,
we have petabytes of data stored across
hundreds of Bigtable clusters, and so we’ve run into a number
of different challenges trying to scale Bigtable to meet
our application load. So the first problem
we’ve run into is how to scale the
cluster during ingestion. If, as I mentioned
before, we’re trying to ingest huge amounts of
data via Cloud Dataflow all at once into a
Bigtable cluster, sometimes we’ll easily
overrun the cluster with that write volume. This is a graph of– actually, I don’t know
which cluster this was, but you can see that
at different times, when we’re ingesting data,
we’ll have huge spikes in CV utilization, and this is not
conducive to having a good user experience. You want to make sure
that when you ingest data into your system,
you’re not overloading it such that the read requests
essentially get drowned out, and have a forward latency. So what we found
is a good technique is that we’ll scale
the Bigtable cluster up just before doing ingestion. So we’ll start our
Dataflow pipeline, and the first
thing it will do is it will change the number
of nodes in the cluster that it wants to write into. So you can do this easily
via the Bigtable API, and you’ll temporarily
scale this up by a factor of
maybe two or three, or some factor that we’ve
empirically determined makes sense for this use case. Once we scale it up, we’ll
then ingest all the data at the maximum speed that
Dataflow will allow us to do, and even at that speed, we
won’t impact the read requests. So we’ll have still a
good user experience during that entire process. And once that’s done,
we’ll wait a little while as well for the data to settle
down and find its right home. Then once that’s done,
we can drop the size back down again. so we’re not over
provisioned, and spending extra money on capacity that
we don’t need at all times. So in a timeline, this
is what this looks like. We add nodes to the cluster. We’ll wait for the
cluster size to increase. Wait for some additional
time for caches to warm, and things like that. We’ll ingest data all at the
maximum speed we possibly can, and then wait for
that data to settle. And once that’s done, we’ll
remove those extra nodes at the cluster to
keep our costs low, and this solution tends
to work fairly well. However, there is a new feature
from the Cloud Bigtables team that might make this
no longer necessary, and that is the
Bigtable replication feature that they’ve
recently launched. Now it’s possible to have
multiple clusters replicate to each other. Or rather, replicate from one
cluster to another cluster. And what we’ve
found is that if we have a write cluster
and a read cluster, instead of replication
in between them, this can help us solve
this problem in a slightly different way. So in this diagram
here, at very top, we have Cloud Dataflow writing
a huge amount of data in. That’s that very thick arrow. And the write CBT cluster here
is taking all those writes in. It might be a cluster that’s
larger or maybe provisioned differently so that it can
accept all this capacity. And then Bigtables
asynchronous application will take that data
from the write cluster, and write it into the read
cluster automatically. This writing happens
in the background. It’s not something you
need to manage yourself. You just turn it on, and
then it runs when it can. This also means that this
usage is eventually consistent. So if you need instant
consistency for your data, this is not something
that you can really use for that use case. But once the data
has all been adjusted into the write
cluster, at some time afterwards, that data will have
propagated through to the read cluster without effecting
the read clusters read latency or
anything like that. And that’s great. That means that we can
get this high performance, and the back end servers
never really know that we did an
ingestion, and all we had to do was
set up replication between two different clusters. Keep in mind, though, that if
you use a solution like this, you’ll have to keep
two clusters up, and doing all that
management does take a little bit of extra
cost, and a bit of extra time. So really, pick one of
these two solutions, depending on your use case. Another problem we’ve had
when trying to scale Bigtable involves optimizing
our key spaces. So this graph on
the screen here is a screenshot from
Google Stackdriver tool, which allows
you to visualize the activity in your
Bigtable cluster per row. So the axes here are a
little bit confusing. Time is across the bottom. So from left to right, you
have time going forward. The row key in your database
is from top to bottom. So you essentially have rows
that start with A at the top, and rows that start with Z
at the bottom, let’s say. And then the color
of each cell here is the activity for
that row at that time. So in this graph
here, the brightness on the left-hand
side of the graph is an ingestion
taking place, and then over time, as the
ingestion finishes, the colors become more
muted to show that there’s less traffic over time. But nonetheless, the traffic
is kind of evenly distributed. No one row in this
database has more reads or writes than any
other row in this database, for instance. So this is a very
important property. If some of these rows
received a lot more traffic than other rows, this database
would not be easily scalable, and I’ll talk about
why in a second. Let’s say we have a
key visualizer that looks like this. Very similar to the last
one, but now there’s a bright line across
the middle of the graph. That bright line is telling
us that one row key has more reads and writes than
the rest of the cluster, and that’s a sign that this
one key, because it’s so hot, it’s kind of limiting the
scalability of our cluster, and causing us to kind of
waste money on our cluster if we try to scale it up,
and let me show why that is. So we’re going to do
an exercise together. You don’t have to do anything. Just sit there. But imagine this is
our row key space. This is all of the keys
in our Bigtable cluster. We have the entire
alphabet, and we want this Bigtable cluster to
have two different nodes in it. So a two-node
cluster with 26 keys. The first 13 go to node 1, and
the second 13 go to node 2. This is not something
that we can configure. This is how Bigtable segments
your key space, no matter what. So essentially,
the keys are sorted alphabetically, or
technically, lexicographically. And this means that
if you add more nodes, it’ll just chop up the key
space in the same exact way, and each node will get
a proportional amount of the key space. Now, there’s a
big asterisk there in that certain situations
can cause Bigtable to kind of rebalance the
cluster a little bit more, but you always have
this alphabetical setup happening here. So in this cluster,
node 1 and node 2 both have 50% of the
traffic to the cluster. So the key space
is perfectly even. No key is receiving more
traffic than any other key, and it’s easy to scale this. If we add more
nodes, we reshard, and everything
gets really quick. And even better, our
99th percentile read time right now is only
10 milliseconds. So if this is a metric
that’s important, it’s certainly
important to us, we want to make sure that
in the 99th percentile, our read times are still good. So 10 milliseconds, pretty
good for this cluster. That’s kind of our target. Now, let’s say that
something goes wrong. We get a traffic spike to a
hotkey that starts with J, and J is currently
on node 1 there. So with a huge
amount of new traffic coming into this one key, what’s
going to happen to the cluster? And more importantly,
what’s going to happen to our read time? If we do the math, it
turns out that node 1 now gets 90% of the
traffic to the cluster, and node 2 only gets 10%. Keep in mind that
node 2 is still serving the same number
of requests it was before. It’s just that there’s so much
more traffic going to node J, that it’s kind of skewing
the balance between these two nodes. And if we look at the
number at the top here, the 99th percentile
read time has now shot up to 60 milliseconds. That’s pretty bad. If we started at 10
milliseconds, now we’re at 60, that’s troubling
to trip some alarm and wake someone up at 3:00
in the morning. So if I’m that
engineer who woke up at 3:00 in the morning who is
trying to solve this problem, what do I do? Well, Bigtable allows me to
have one parameter to configure, and that is the number
of nodes in the cluster. So let’s say I’m kind of groggy,
I don’t know what’s going on. I try to fix the
problem by turning up the number of nodes
in the cluster, and that gives me this. A cluster that has
four nodes in it, and each of these four nodes now
has a quarter of the row space. Node 1 contains A through
G, node 2 is G through M, node 3 is M through
T, and so on. But the key that
starts with J there is still receiving
a lot more traffic than all of the other
keys in the cluster. So if we do the
math again here, we see that we now have a
very unbalanced cluster. Nodes 1, 3, and 4
are each serving 5% of traffic, which is great. They have a lot of capacity, and
they’re going to be very fast. However, in the worst case,
or even in the 99th percentile case, node 2 is serving
85% of our traffic, and that 85% is much worse
than we expected it to be. We wanted to double
the number of nodes, and thus, cut the
latency in half. But in effect, we’ve really
only cut the latency down by three milliseconds. What gives? Why is it that we can
double the number of nodes, and thus our cost, and not
get a performance gain? Well, we did the math,
and pull out your phones. You might want to take
a screenshot of this. This is the relationship between
the latency of your cluster and the ratio of the traffic to
your hottest key to the traffic of the median key. To the best of my knowledge,
this is what we go off of, and this is how we understand
how to scale Bigtable. So if the traffic to your
hottest key is very, very high, and the traffic to your
median key is very, very low, and the ratio between
them is very significantly more than one, then this is
going to limit scalability by a large factor. If the traffic to the hottest
key and the median key are the same, that ratio
ends up being just one, and then the traffic
to your cluster is divided by all the nodes in
the cluster, and you’re golden. You can scale as
much as you want to just by increasing
the number of nodes. It might be more useful
to see this as a graph, though, because certainly this
doesn’t make any sense to me. Here’s this plotted as a graph. So let me explain
what this means. Across the very
bottom of the graph, here, we have the number of
nodes in your Bigtable cluster. At the very left-hand
side of the graph, that’s a Bigtable
cluster that has one node in it, and at the
very right-hand side, that’s a Bigtable cluster that
has 100 nodes in it. So you can turn that
dial from 1 to 100 or beyond very, very easily. The four different
lines in this graph represent four
different distributions of data in the cluster. So the very top,
we have a red line. That red line is
what happens if you have one key receiving
100% of the entire traffic to the cluster. If you add more
nodes to the cluster, that will not fix your latency. It will not add throughput
because, essentially, that one key is still only
being served by one node. The additional
nodes that you add don’t do anything
because they don’t own any data that’s actually
being used for anything. So that’s the worst
case scenario. This is why you don’t want
to use Bigtable for just one piece of data, essentially. The best case scenario
is the opposite, here. It’s the green line
the very bottom. That’s what happens if you
have flat traffic distribution. No key is more loaded than
any other key in the cluster. And that green line is great. That means that if
you have one node, your latency might
be 100 milliseconds. If you have two
nodes, your latency might be 50 milliseconds. Four nodes, 25
milliseconds, and so on. Your latency is, essentially,
the traffic to your cluster divided by the number
of nodes, and that’s why we get this nice
line here because at 100 nodes, in theory,
each node is only processing 1% of the traffic. So you get a really, really
great scalability curve here. We have two curves in
the middle that kind of show why you care
about this if you have different
distributions of data. If one key receives 100
times the median traffic, that’s the orange line,
second to the top there. You can add more
nodes to the cluster, and it will get better. The scalability will improve and
your p 99 will improve, but not by that much, and that’s
because one node is still significantly bottlenecking
the rest of the cluster. The yellow line, second
from the bottom there, that’s another example. That’s what happens
if one key receives 10 times the median traffic,
which doesn’t seem like too much, but adding more
nodes will still not increase your performance
in the worst case. So this is definitely
something that we’ve used to try and re-architect
our systems at Spotify to make sure that everyone
gets a good experience every single time. All right, so if having your key
space designed in a certain way is super important, how do
you design your schemas? What are some good
tips and tricks to make sure you don’t
run into this problem? Well, choosing an evenly
distributed key space is probably the
number one thing you need to care about in your
schema design for Bigtable. And there are three
strategies I’ve run into here. They’re in order
of worst to best. So strategy number one is
not something you should do. Strategy number
three is something that I would recommend doing. So I’ll go through
them in order, and talk about why you shouldn’t
do them or should do them. So the first strategy for
distributing your keys across the entire key space
is to hash all the keys. So you could take the keys
that you would regularly put in the database. Let’s say that maybe they’re
user names or something like that, and just hash them. Pick a non-cryptographic
hash function. It doesn’t need to be secure. It just needs to be fast
because the only purpose of this is to distribute this
data across the cluster. This works great. And technically,
you can do this, and you’ll get good performance. However, this is not good for
a whole bunch of other reasons. If you hash this data
randomly, or you hash it with a hash function, you
can’t use CBT anymore. You can’t use your
debugging tools. You can’t read the key
space on your screen because it doesn’t
really make sense to you. It’s essentially obfuscated, and
that’s not good for debugging or for developing applications. You also can’t do range
scans on this data. So if your keys have
common prefixes, it’s very easy in Bigtable
to do range scans, and say start at this key,
and go until you hit this key. Well, if your keys
are hashed, they’re effectively randomly
distributed. Which is great for
performance, but not good for any sort
of data locality, or understanding where data
is similar to other data. So a good second
approach is to reverse all the keys in your key space. So we tried this
for a little while. Assuming you have some
sort of numeric ordering, meaning that the digit at
the very right of your data, or at the least
significant digit changes more often than
the most significant digit. So an example of this
is maybe timestamps. If you had timestamps
in Bigtable, you could reverse the
keys, and then the digit that changes most
frequently every second would, essentially,
scatter your data across all the different
nodes in the cluster. This, again, technically works. You could reverse all the keys
before inserting the data, and then reverse all the
keys when you read again. This is also more human
readable and more debugable because you can see
that data there. You can read it
backwards and put it through a script or
something like that, and now it’s
possible to do that. But this is still not great. There’s still a better
solution available, and that’s solution
number three, here, which is to promote the most
random field from your data to the row key. So try to find
something in your data that can be used to impart some
randomness into your row key, and move that from
the value that you’re storing into the row itself. So if you have some sort
of a random identifier, let’s say maybe it’s an
item ID or a UU idea, or something like that, and if
that’s consistent in your data, and it’s random, but
it’s deterministic, try to put that
into your row key, thus distributing that data
randomly across your cluster, but still giving you
some business logic you can do on top of that. This preserves readability,
uses very little CPU overhead, aids debugging, and allows
you to do range scans if those keys are consistent. So I keep talking about
these range scans, and I haven’t really defined
exactly what they are. So in Bigtable,
because all the data is ordered alphabetically
or lexicographically, you can scan from one key to
another very, very simply. You can tell Bigtable to start
a scan at a certain place, and end a scan at another
place, and even do a prefix scan on that
because it’s, essentially, the same thing. If you have a random
distribution of your data across the cluster
and your key space, you can still do these
row range scans provided that you know what the
first section of your data looks like. And if you exploit this even
more, you can have tiered keys. So tiered keys are where the
first section of your key is kind of a superset of all
the later sections of the key, and let me talk a little
bit of what that means. So here’s a very
contrived example. Don’t do this, it’s just for
explanation purposes here. But let’s say that you’re
keying your data in Bigtable by keys that look like this. They start with a country
that your data is in, the region, the city,
and then the district, and all of these
form the same key. So your keys might
look something like this, where you’d have
the country at the first part here, so Canada, followed by the
region, or state, or province, or whatever you have there. Then a city, and
then a district. This means that you can do
range scans on this data very, very effectively. If you wanted to find all
of the cities in California, let’s say, you
could ask Bigtable to range scan from
the start of USA colon California to the end
of USA colon California, and that would give you all
those rows in one query. This is great,
however, this still doesn’t randomly
distribute the data across your entire database. So you have a bit of
performance problems here, and one
strategy around that is to remove the
first part of the key, if you know that it
has low cardinality. So in this case, there are only
190-odd countries in the world, and because of that,
all of the USA data would be at the very
bottom of your cluster. At those nodes there. And all the data for
certain countries would be kind of right
next to each other. So to get around that,
you can take this mapping of, let’s say,
country to region, or some other very, very
low cardinality mapping, and put that into
your application. And then in Bigtable, you could
store just the second layer of your key forward. There are many more
regions in the world, many more states and provinces
than there are countries, and now doing this allows you to
randomly distribute this data– not randomly, but put
that across the cluster in a way that helps you
scale without hot spotting on certain areas of the cluster. Let’s talk about one
more example here. This is a key design for
doing row range scans over data that involves IDs. So this is a real example
from Spotify’s data. Obviously, AAA1 and
BBB2 are not real users. This is all kind of made
up, but this is a real task that we have to solve. What do we do if we
want to know what tracks a user has listened to? So if I’m AAA1,
and I’ve listened to track one at timestamp 123,
we can store that in Bigtable with a schema that
looks like this. Start with AAA1, then the second
part of the key is track one, and then the timestamp
encodes exactly when I listened to that. And then if I wanted to do
a range scan and find out exactly when this user has
listened to all these tracks, well, I Bigtable
to range scan AAA1. Give me all the rows that
start with this value, and it gives me back
hundreds, maybe thousands, maybe tens of thousands of rows. And each row contains
both that user ID, and the track
they’ve listened to, and the time they
listened to that track. Now, because this is a
tiered key structure, I can go even further. I can say tell me not
just when user AAA1 has listened to this track, but
specifically, track two. So if I do a row range scan
from AAA1, including track two, I’ll get all the
different plays that user has had of that track,
and that gives me all those rows that are kind of
continuous next to each other in that Bigtable cluster. So this is a super
useful technique to try and put your
data in a format that will be both scalable and
performant across Bigtable, but also in a way
that you can still get this kind of
performance you’d expect out of a relational database
with multiple indices. OK, I’m just running along here. This took less time than
I expected, but let’s talk a little bit about open source. So I mentioned earlier,
we have some tools that are very, very useful. You can use this in
your own pipelines, and get your data
processing done much quicker with some of these tools. So we love open source. We have a lot of open
source projects on GitHub. If you check out
GitHub/Spotify, you’ll find these projects,
and many, many more. The first project I want to talk
about is a project called Scio. So like I mentioned
before, Scio comes from the Latin “to learn.” It is a Scala based framework
for Apache Beam and Google Cloud Dataflow,
and it’s inspired by Apache Spark and Scalding. It’s fairly popular in the
open source community now. It’s got a couple
thousand stars on GitHub, if that’s the measure
of popularity, and we use this
very, very heavily internally to try and do
all of our data processing. So like the example I
showed you before, this allows us to, let’s
say, grab data from GCS, and dump that into the
Bigtable with maybe only 10 lines of code. We could also do
complicated data pipelines. You can map, filter,
do all the operations you’d expect from a functional
programming language, but across large sets of data
on Cloud Dataflow in the cloud. So Scio is super, super useful. Check that out on GitHub. There are lots of tutorials and
other documentation available, and we’d love to see
you start using that. Simple Bigtable is the second
thing on the slide here. This is a simpler, more
asynchronous Bigtable client for Java and Scala. Now, the official Bigtable
client works just fine. It’s just a little bit verbose. So what we did was we
wrote a simpler client. It’s based on the
Java builder pattern, and it allows you to do
asynchronous operations against Bigtable with a
very small amount of code. This also powers a lot of
our production experiences. When I mentioned earlier that
Discover Weekly, and the Home Page, and stuff like that is
all served out of Bigtable, it’s going through this library. So this is production
ready code. Try to use that, and you’ll
see your Bigtable interactions become much terser in your
code, and easier to work with. And finally, I want
to talk a little bit about a project called
Annoy, and Annoy is a library for approximate
nearest neighbors calculations. What that means is if you’re
doing machine learning and you have
representations of data, Annoy allows you to select
similar data in spaces. I’m not doing a machine
learning talk here, so I’m not going to
get too much into it, but it’s approximate nearest
neighbors, and the OY I think stands for Oh Yeah. But Annoy, again, on GitHub. Very popular with lots of stars. Very performant library. Not the most performant
library in the world, but it has great
support bindings for most major languages,
and it’s very widely used in the community. So try to use Annoy if
you have machine learning tasks that could use
nearest neighbor lookups. And with that, I’ve
breezed through this talk. Again, my name is Peter
Sobot, Senior Engineer on the Recommendations
team at Spotify, New York. Tweet me @psobot or @SpotifyEng. And please, do give
feedback in the app there– in the Google Cloud Next app. This feedback goes back
to the Google Cloud team to figure out which
sessions to have next time. Thank you so much for your time. [APPLAUSE]

One thought on “Music Recommendations at Scale With Cloud Bigtable (Cloud Next ’19)

Leave a Reply

Your email address will not be published. Required fields are marked *