Scalding the Crunchy Pig for Cascading into the Hive
@dawhiting
Stockholm Sept 2013
NYC Oct 2013
@dawhiting
Stockholm Sept 2013
NYC Oct 2013
"Build a Cascade
from Flows
which connect Taps
via Pipes
built into Assemblies
to process Tuples
"
Inputs
Output
public class CascadingStreamExample extends SubAssembly { private Fields trackPlayedMessageFields = new Fields("tpm.track_id", "tpm.user_name", "tpm.timestamp"); private Fields userInfoFields = new Fields("u.user_name", "u.country", "u.subscription_level"); private Fields trackMetadata = new Fields("t.track_id", "t.title", "t.artist"); public CascadingStreamExample(Pipe trackPlayedMessage, Pipe userInfo, Pipe trackMetadata) { Pipe tpmUser = new CoGroup(trackPlayedMessage, new Fields("tpm.user_name"), userInfo, new Fields("u.user_name"), new InnerJoin()); Pipe tpmUserTrack = new CoGroup(tpmUser, new Fields("tpm.track_id"), trackMetadata, new Fields("t.track_id")); Pipe canonicalise = new Each(tpmUserTrack, new Fields("t.artist"), new CanonicalArtist(), Fields.REPLACE); Pipe aggregate = new AggregateBy(canonicalise, new Fields("canonical_artist", "u.country"), new CountBy(new Fields("count"))); Pipe cut = new Retain(aggregate, new Fields("canonical_artist", "u.country", "count")); Pipe rename = new Rename(cut, Fields.ALL, new Fields("artist", "country", "streams")); this.setTails(rename); } }
public class CanonicalArtist extends BaseOperation<Object> implements Function<Object> { public CanonicalArtist() { super(1, new Fields("canonical_artist")); } @Override public void operate(FlowProcess flowProcess, FunctionCall<Object> functionCall) { String artist = functionCall.getArguments().getString(0); String canonicalArtist = artist.toLowerCase(); functionCall.getOutputCollector().add(new Tuple(canonicalArtist)); } }
class ArtistCountryStreams(args: Args) extends Job(args) { val trackPlayedMessageSchema = ('tpmTrackId, 'tpmUserName, 'tpmTimestamp) val userInfoSchema = ('uUserName, 'uCountry, 'uSubscriptionLevel) val trackMetadataSchema = ('tTrackId, 'tTitle, 'tArtist) SomeSource(args("tpm"), trackPlayedMessageSchema) .joinWithSmaller('tpmUserName -> 'uUserName, SomeSource("userInfo", userinfoSchema)) .joinWithSmaller('tpmTrackId -> 'tTrackId, SomeSource("trackMetadata", trackMetadataSchema)) .map('tArtist -> 'canonArtist) { tArtist: String => tArtist.toLowerCase } .project('canonArtist, 'uCountry) .groupBy('canonArtist, 'uCountry) { _.size('streams) } .rename(('canonArtist, 'uCountry) -> ('artist, 'country)) }
Works with real types with a strategy based on lazy collections
PCollection<T>
PTable<K,V>
PGroupedTable<K,V>
MapFn<T1,T2>
: T1 → T2CombineFn<K,V>
: (K, Iterable<V>) → (K, V)public PCollection<ArtistCountryStreams> computeStreams( PCollection<TrackPlayedMessage> trackPlayedMessages, PCollection<UserInfo> userInfos, PCollection<TrackMetadata> trackMetadatas) { PCollection<Pair<TrackPlayedMessage, UserInfo>> messageUser = Join.innerJoin( trackPlayedMessages.by(new TrackPlayedMessage.Username(), Avros.strings()), userInfos.by(new UserInfo.Username(), Avros.strings())) .values(); PCollection<ArtistCountryStreams> artistCountryStreams = Join.innerJoin( messageUser.by(mapFirst(new TrackPlayedMessage.TrackId(), UserInfo.class), Avros.ints()), trackMetadatas.by(new TrackMetadata.TrackId(), Avros.ints()) ) .values() .parallelDo(new CreateArtistCountryStreamsKey(), Avros.reflects(ArtistCountryStreams.Key.class)) .count() .parallelDo(new ArtistCountryStreams.Create(), Avros.reflects(ArtistCountryStreams.class)); return artistCountryStreams; }
public static class TrackPlayedMessage { public final String username; public final int trackId; public final long timestamp; public TrackPlayedMessage() { username = null; trackId = 0; timestamp = 0; } public TrackPlayedMessage(String username, int trackId, long timestamp) { this.username = username; this.trackId = trackId; this.timestamp = timestamp; } public static class Username extends MapFn<TrackPlayedMessage, String> { @Override public String map(TrackPlayedMessage input) { return input.username; } } public static class TrackId extends MapFn<TrackPlayedMessage, Integer> { @Override public Integer map(TrackPlayedMessage input) { return input.trackId; } } }
private PCollection<TrackPlayedMessage> messages = MemPipeline.typedCollectionOf( Avros.reflects(TrackPlayedMessage.class), new TrackPlayedMessage("adam", 1001, 200000L), new TrackPlayedMessage("brian", 1001, 200001L), new TrackPlayedMessage("charlie", 1002, 200002L), new TrackPlayedMessage("dave", 1003, 200003L), new TrackPlayedMessage("dave", 1004, 200004L) ); private PCollection<TrackMetadata> trackMeta = MemPipeline.typedCollectionOf( Avros.reflects(TrackMetadata.class), new TrackMetadata(1001, "track1", "artistA"), new TrackMetadata(1002, "track2", "artistA"), new TrackMetadata(1003, "track1", "artistB"), new TrackMetadata(1004, "track2", "artistB") ); private PCollection<UserInfo> userInfo = MemPipeline.typedCollectionOf( Avros.reflects(UserInfo.class), new UserInfo("adam", SubscriptionLevel.FREE, "PL"), new UserInfo("brian", SubscriptionLevel.FREE, "US"), new UserInfo("charlie", SubscriptionLevel.PREMIUM, "US"), new UserInfo("dave", SubscriptionLevel.PREMIUM, "GB") ); @Test public void testComputeStreams() throws Exception { CrunchStreamExample crunchStreamExample = new CrunchStreamExample(); List<ArtistCountryStreams> output = Lists.newArrayList( crunchStreamExample.computeStreams(messages, userInfo, trackMeta) .materialize()); List<ArtistCountryStreams> expected = Lists.newArrayList(); assertEquals(expected, output); }
object CrunchStreamExample { class TrackPlayedMessage(val username: String, val trackId: Int, val timestamp: Long) class UserInfo(val username: String, val isPremium: Boolean, val country: String) class TrackMetaData(val trackId: Int, val title: String, val artist: String) class ArtistCountryKey(val artist: String, val country: String) class ArtistCountryStreams(val key: ArtistCountryKey, val streams: Long) def computeStreams(trackPlayedMessages: PCollection[TrackPlayedMessage], userInfos: PCollection[UserInfo], trackMetadatas: PCollection[TrackMetaData]) : PCollection[ArtistCountryStreams] = trackPlayedMessages.by(_.username) .innerJoin(userInfos.by(_.username)) .values() .by({ case (msg, user) => msg.trackId }) .innerJoin(trackMetadatas.by(_.trackId)) .values() .map({ case ((msg, user), meta) => new ArtistCountryKey(meta.artist, user.country)}) .count() .map((k: ArtistCountryKey, v: Long) => new ArtistCountryStreams(k, v)) }
DEMO: what types can do for you
TRACK_PLAYED_MESSAGES = LOAD '/tpm' AS (m_track_id, m_user_name, m_timestamp); USER_INFO = LOAD '/user_info' AS (u_user_name, u_country, u_subscription_level); TRACK_METADATA = LOAD '/track_metadata' AS (t_track_id, t_title, t_artist); MESSAGE_USER = JOIN TRACK_PLAYED_MESSAGES BY m_user_name, USER_INFO BY u_user_name; MESSAGE_USER_TRACK = JOIN MESSAGE_USER BY m_track_id, TRACK_METADATA by t_track_id; GROUPED = GROUP MESSAGE_USER_TRACK BY t_artist, u_country; OUTPUT = FOREACH GROUPED GENERATE $0.t_artist, $0.u_country, COUNT(MESSAGE_USER_TRACK); STORE OUTPUT INTO '/output';
CREATE TABLE track_played_message (username STRING, track_id INT, timestamp INT) ... CREATE TABLE user_info (username STRING, is_premium BOOLEAN, country STRING) ... CREATE TABLE track_metadata (track_id INT, title STRING, artist STRING) ... CREATE TABLE artist_country_streams (artist STRING, country STRING, streams INT) ... INSERT OVERWRITE TABLE artist_country_streams SELECT t.artist, u.country, count(*) FROM track_played_message msg JOIN user_info u on msg.username = u.username JOIN track_metadata t on msg.track_id = t.track_id GROUP BY t.artist, u.country
Data Engineers, Hadoop Experts, Software Developers
NYC and Stockholm
www.spotify.com/jobs