Scalding the Crunchy Pig for Cascading into the Hive
@dawhiting
Stockholm Sept 2013
NYC Oct 2013
@dawhiting
Stockholm Sept 2013
NYC Oct 2013
"Build a Cascade from Flows which connect Taps via Pipes built into Assemblies to process Tuples"
Inputs
Output
public class CascadingStreamExample extends SubAssembly {
private Fields trackPlayedMessageFields =
new Fields("tpm.track_id", "tpm.user_name", "tpm.timestamp");
private Fields userInfoFields =
new Fields("u.user_name", "u.country", "u.subscription_level");
private Fields trackMetadata =
new Fields("t.track_id", "t.title", "t.artist");
public CascadingStreamExample(Pipe trackPlayedMessage, Pipe userInfo, Pipe trackMetadata) {
Pipe tpmUser = new CoGroup(trackPlayedMessage,
new Fields("tpm.user_name"),
userInfo,
new Fields("u.user_name"),
new InnerJoin());
Pipe tpmUserTrack = new CoGroup(tpmUser,
new Fields("tpm.track_id"),
trackMetadata,
new Fields("t.track_id"));
Pipe canonicalise = new Each(tpmUserTrack,
new Fields("t.artist"),
new CanonicalArtist(),
Fields.REPLACE);
Pipe aggregate = new AggregateBy(canonicalise,
new Fields("canonical_artist", "u.country"),
new CountBy(new Fields("count")));
Pipe cut = new Retain(aggregate, new Fields("canonical_artist", "u.country", "count"));
Pipe rename = new Rename(cut, Fields.ALL, new Fields("artist", "country", "streams"));
this.setTails(rename);
}
}
public class CanonicalArtist extends BaseOperation<Object> implements Function<Object> {
public CanonicalArtist() {
super(1, new Fields("canonical_artist"));
}
@Override
public void operate(FlowProcess flowProcess, FunctionCall<Object> functionCall) {
String artist = functionCall.getArguments().getString(0);
String canonicalArtist = artist.toLowerCase();
functionCall.getOutputCollector().add(new Tuple(canonicalArtist));
}
}
class ArtistCountryStreams(args: Args) extends Job(args) {
val trackPlayedMessageSchema = ('tpmTrackId, 'tpmUserName, 'tpmTimestamp)
val userInfoSchema = ('uUserName, 'uCountry, 'uSubscriptionLevel)
val trackMetadataSchema = ('tTrackId, 'tTitle, 'tArtist)
SomeSource(args("tpm"), trackPlayedMessageSchema)
.joinWithSmaller('tpmUserName -> 'uUserName,
SomeSource("userInfo", userinfoSchema))
.joinWithSmaller('tpmTrackId -> 'tTrackId,
SomeSource("trackMetadata", trackMetadataSchema))
.map('tArtist -> 'canonArtist) { tArtist: String => tArtist.toLowerCase }
.project('canonArtist, 'uCountry)
.groupBy('canonArtist, 'uCountry) { _.size('streams) }
.rename(('canonArtist, 'uCountry) -> ('artist, 'country))
}
Works with real types with a strategy based on lazy collections
PCollection<T>PTable<K,V>PGroupedTable<K,V>MapFn<T1,T2>: T1 → T2CombineFn<K,V>: (K, Iterable<V>) → (K, V)
public PCollection<ArtistCountryStreams> computeStreams(
PCollection<TrackPlayedMessage> trackPlayedMessages,
PCollection<UserInfo> userInfos,
PCollection<TrackMetadata> trackMetadatas)
{
PCollection<Pair<TrackPlayedMessage, UserInfo>> messageUser =
Join.innerJoin(
trackPlayedMessages.by(new TrackPlayedMessage.Username(), Avros.strings()),
userInfos.by(new UserInfo.Username(), Avros.strings()))
.values();
PCollection<ArtistCountryStreams> artistCountryStreams =
Join.innerJoin(
messageUser.by(mapFirst(new TrackPlayedMessage.TrackId(), UserInfo.class),
Avros.ints()),
trackMetadatas.by(new TrackMetadata.TrackId(), Avros.ints())
)
.values()
.parallelDo(new CreateArtistCountryStreamsKey(),
Avros.reflects(ArtistCountryStreams.Key.class))
.count()
.parallelDo(new ArtistCountryStreams.Create(),
Avros.reflects(ArtistCountryStreams.class));
return artistCountryStreams;
}
public static class TrackPlayedMessage {
public final String username;
public final int trackId;
public final long timestamp;
public TrackPlayedMessage() {
username = null;
trackId = 0;
timestamp = 0;
}
public TrackPlayedMessage(String username, int trackId, long timestamp) {
this.username = username;
this.trackId = trackId;
this.timestamp = timestamp;
}
public static class Username extends MapFn<TrackPlayedMessage, String> {
@Override
public String map(TrackPlayedMessage input) {
return input.username;
}
}
public static class TrackId extends MapFn<TrackPlayedMessage, Integer> {
@Override
public Integer map(TrackPlayedMessage input) {
return input.trackId;
}
}
}
private PCollection<TrackPlayedMessage> messages = MemPipeline.typedCollectionOf(
Avros.reflects(TrackPlayedMessage.class),
new TrackPlayedMessage("adam", 1001, 200000L),
new TrackPlayedMessage("brian", 1001, 200001L),
new TrackPlayedMessage("charlie", 1002, 200002L),
new TrackPlayedMessage("dave", 1003, 200003L),
new TrackPlayedMessage("dave", 1004, 200004L)
);
private PCollection<TrackMetadata> trackMeta = MemPipeline.typedCollectionOf(
Avros.reflects(TrackMetadata.class),
new TrackMetadata(1001, "track1", "artistA"),
new TrackMetadata(1002, "track2", "artistA"),
new TrackMetadata(1003, "track1", "artistB"),
new TrackMetadata(1004, "track2", "artistB")
);
private PCollection<UserInfo> userInfo = MemPipeline.typedCollectionOf(
Avros.reflects(UserInfo.class),
new UserInfo("adam", SubscriptionLevel.FREE, "PL"),
new UserInfo("brian", SubscriptionLevel.FREE, "US"),
new UserInfo("charlie", SubscriptionLevel.PREMIUM, "US"),
new UserInfo("dave", SubscriptionLevel.PREMIUM, "GB")
);
@Test
public void testComputeStreams() throws Exception {
CrunchStreamExample crunchStreamExample = new CrunchStreamExample();
List<ArtistCountryStreams> output =
Lists.newArrayList(
crunchStreamExample.computeStreams(messages, userInfo, trackMeta)
.materialize());
List<ArtistCountryStreams> expected = Lists.newArrayList();
assertEquals(expected, output);
}
object CrunchStreamExample {
class TrackPlayedMessage(val username: String, val trackId: Int, val timestamp: Long)
class UserInfo(val username: String, val isPremium: Boolean, val country: String)
class TrackMetaData(val trackId: Int, val title: String, val artist: String)
class ArtistCountryKey(val artist: String, val country: String)
class ArtistCountryStreams(val key: ArtistCountryKey, val streams: Long)
def computeStreams(trackPlayedMessages: PCollection[TrackPlayedMessage],
userInfos: PCollection[UserInfo],
trackMetadatas: PCollection[TrackMetaData])
: PCollection[ArtistCountryStreams] =
trackPlayedMessages.by(_.username)
.innerJoin(userInfos.by(_.username))
.values()
.by({ case (msg, user) => msg.trackId })
.innerJoin(trackMetadatas.by(_.trackId))
.values()
.map({ case ((msg, user), meta) => new ArtistCountryKey(meta.artist, user.country)})
.count()
.map((k: ArtistCountryKey, v: Long) => new ArtistCountryStreams(k, v))
}
DEMO: what types can do for you
TRACK_PLAYED_MESSAGES = LOAD '/tpm' AS
(m_track_id, m_user_name, m_timestamp);
USER_INFO = LOAD '/user_info' AS
(u_user_name, u_country, u_subscription_level);
TRACK_METADATA = LOAD '/track_metadata' AS
(t_track_id, t_title, t_artist);
MESSAGE_USER = JOIN TRACK_PLAYED_MESSAGES BY m_user_name,
USER_INFO BY u_user_name;
MESSAGE_USER_TRACK = JOIN MESSAGE_USER BY m_track_id,
TRACK_METADATA by t_track_id;
GROUPED = GROUP MESSAGE_USER_TRACK BY t_artist,
u_country;
OUTPUT = FOREACH GROUPED GENERATE $0.t_artist,
$0.u_country,
COUNT(MESSAGE_USER_TRACK);
STORE OUTPUT INTO '/output';
CREATE TABLE track_played_message (username STRING, track_id INT, timestamp INT) ... CREATE TABLE user_info (username STRING, is_premium BOOLEAN, country STRING) ... CREATE TABLE track_metadata (track_id INT, title STRING, artist STRING) ... CREATE TABLE artist_country_streams (artist STRING, country STRING, streams INT) ... INSERT OVERWRITE TABLE artist_country_streams SELECT t.artist, u.country, count(*) FROM track_played_message msg JOIN user_info u on msg.username = u.username JOIN track_metadata t on msg.track_id = t.track_id GROUP BY t.artist, u.country
Data Engineers, Hadoop Experts, Software Developers
NYC and Stockholm
www.spotify.com/jobs