Scalding the Crunchy Pig for Cascading into the Hive

@dawhiting

Stockholm Sept 2013
NYC Oct 2013

What should I use?

What's wrong with plain ol' MapReduce?

What features can we have?

The Cascading family
(tuple pipeline model)

Concepts

"Build a Cascade from Flows which connect Taps via Pipes built into Assemblies to process Tuples"

Pipes

The example pipeline

Inputs

Output

Programming with Cascading: Constructing the flow

public class CascadingStreamExample extends SubAssembly {

    private Fields trackPlayedMessageFields =
        new Fields("tpm.track_id", "tpm.user_name", "tpm.timestamp");
    private Fields userInfoFields =
        new Fields("u.user_name", "u.country", "u.subscription_level");
    private Fields trackMetadata =
        new Fields("t.track_id", "t.title", "t.artist");

    public CascadingStreamExample(Pipe trackPlayedMessage, Pipe userInfo, Pipe trackMetadata) {
        Pipe tpmUser = new CoGroup(trackPlayedMessage,
                                   new Fields("tpm.user_name"),
                                   userInfo,
                                   new Fields("u.user_name"),
                                   new InnerJoin());

        Pipe tpmUserTrack = new CoGroup(tpmUser,
                                        new Fields("tpm.track_id"),
                                        trackMetadata,
                                        new Fields("t.track_id"));

        Pipe canonicalise = new Each(tpmUserTrack,
                                     new Fields("t.artist"),
                                     new CanonicalArtist(),
                                     Fields.REPLACE);

        Pipe aggregate = new AggregateBy(canonicalise,
                                         new Fields("canonical_artist", "u.country"),
                                         new CountBy(new Fields("count")));

        Pipe cut = new Retain(aggregate, new Fields("canonical_artist", "u.country", "count"));
        Pipe rename = new Rename(cut, Fields.ALL, new Fields("artist", "country", "streams"));
        this.setTails(rename);
    }
}
            

Programming with Cascading: custom functions

public class CanonicalArtist extends BaseOperation<Object> implements Function<Object> {
    
    public CanonicalArtist() {
        super(1, new Fields("canonical_artist"));
    }

    @Override
    public void operate(FlowProcess flowProcess, FunctionCall<Object> functionCall) {
        String artist = functionCall.getArguments().getString(0);
        String canonicalArtist = artist.toLowerCase();
        functionCall.getOutputCollector().add(new Tuple(canonicalArtist));
    }
}
          

Cascading: Building and testing

Cascading: summary

Scalding

Scalding: code

class ArtistCountryStreams(args: Args) extends Job(args) {
    val trackPlayedMessageSchema = ('tpmTrackId, 'tpmUserName, 'tpmTimestamp)
    val userInfoSchema = ('uUserName, 'uCountry, 'uSubscriptionLevel)
    val trackMetadataSchema = ('tTrackId, 'tTitle, 'tArtist)
    SomeSource(args("tpm"), trackPlayedMessageSchema)
        .joinWithSmaller('tpmUserName -> 'uUserName,
                         SomeSource("userInfo", userinfoSchema))
        .joinWithSmaller('tpmTrackId -> 'tTrackId,
                         SomeSource("trackMetadata", trackMetadataSchema))
        .map('tArtist -> 'canonArtist) { tArtist: String => tArtist.toLowerCase }
        .project('canonArtist, 'uCountry)
        .groupBy('canonArtist, 'uCountry) { _.size('streams) }
        .rename(('canonArtist, 'uCountry) -> ('artist, 'country))
}
          

Scalding: summary

The Crunch family
(typed functional model)

Crunch: Concepts

Works with real types with a strategy based on lazy collections

Crunch: code

public PCollection<ArtistCountryStreams> computeStreams(
            PCollection<TrackPlayedMessage> trackPlayedMessages,
            PCollection<UserInfo> userInfos,
            PCollection<TrackMetadata> trackMetadatas)
{
    PCollection<Pair<TrackPlayedMessage, UserInfo>> messageUser =
        Join.innerJoin(
            trackPlayedMessages.by(new TrackPlayedMessage.Username(), Avros.strings()),
            userInfos.by(new UserInfo.Username(), Avros.strings()))
        .values();

    PCollection<ArtistCountryStreams> artistCountryStreams =
        Join.innerJoin(
                messageUser.by(mapFirst(new TrackPlayedMessage.TrackId(), UserInfo.class),
                               Avros.ints()),
                trackMetadatas.by(new TrackMetadata.TrackId(), Avros.ints())
            )
            .values()
            .parallelDo(new CreateArtistCountryStreamsKey(),
                        Avros.reflects(ArtistCountryStreams.Key.class))
            .count()
            .parallelDo(new ArtistCountryStreams.Create(),
                        Avros.reflects(ArtistCountryStreams.class));

    return artistCountryStreams;
}
        

Crunch: type definition

public static class TrackPlayedMessage {
    public final String username;
    public final int trackId;
    public final long timestamp;
    public TrackPlayedMessage() {
        username = null;
        trackId = 0;
        timestamp = 0;
    }
    public TrackPlayedMessage(String username, int trackId, long timestamp) {           
        this.username = username;
        this.trackId = trackId;
        this.timestamp = timestamp;
    }
    public static class Username extends MapFn<TrackPlayedMessage, String> {
        @Override
        public String map(TrackPlayedMessage input) {
            return input.username;
        }
    }
    public static class TrackId extends MapFn<TrackPlayedMessage, Integer> {
        @Override
        public Integer map(TrackPlayedMessage input) {
            return input.trackId;
        }
    }
}
          

Crunch: testing

private PCollection<TrackPlayedMessage> messages = MemPipeline.typedCollectionOf(
        Avros.reflects(TrackPlayedMessage.class),
        new TrackPlayedMessage("adam", 1001, 200000L),
        new TrackPlayedMessage("brian", 1001, 200001L),
        new TrackPlayedMessage("charlie", 1002, 200002L),
        new TrackPlayedMessage("dave", 1003, 200003L),
        new TrackPlayedMessage("dave", 1004, 200004L)
);

private PCollection<TrackMetadata> trackMeta = MemPipeline.typedCollectionOf(
        Avros.reflects(TrackMetadata.class),
        new TrackMetadata(1001, "track1", "artistA"),
        new TrackMetadata(1002, "track2", "artistA"),
        new TrackMetadata(1003, "track1", "artistB"),
        new TrackMetadata(1004, "track2", "artistB")
);

private PCollection<UserInfo> userInfo = MemPipeline.typedCollectionOf(
        Avros.reflects(UserInfo.class),
        new UserInfo("adam", SubscriptionLevel.FREE, "PL"),
        new UserInfo("brian", SubscriptionLevel.FREE, "US"),
        new UserInfo("charlie", SubscriptionLevel.PREMIUM, "US"),
        new UserInfo("dave", SubscriptionLevel.PREMIUM, "GB")
);


@Test
public void testComputeStreams() throws Exception {
    CrunchStreamExample crunchStreamExample = new CrunchStreamExample();
    List<ArtistCountryStreams> output =
        Lists.newArrayList(
            crunchStreamExample.computeStreams(messages, userInfo, trackMeta)
            .materialize());
    List<ArtistCountryStreams> expected = Lists.newArrayList();
    assertEquals(expected, output);
}
          

Crunch: summary

Scrunch: Scala API for Crunch

Scrunch: code

object CrunchStreamExample {

  class TrackPlayedMessage(val username: String, val trackId: Int, val timestamp: Long)
  class UserInfo(val username: String, val isPremium: Boolean,  val country: String)
  class TrackMetaData(val trackId: Int, val title: String, val artist: String)
  class ArtistCountryKey(val artist: String, val country: String)
  class ArtistCountryStreams(val key: ArtistCountryKey, val streams: Long)

  def computeStreams(trackPlayedMessages: PCollection[TrackPlayedMessage],
                     userInfos: PCollection[UserInfo],
                     trackMetadatas: PCollection[TrackMetaData])
              : PCollection[ArtistCountryStreams] =
    trackPlayedMessages.by(_.username)
      .innerJoin(userInfos.by(_.username))
      .values()
      .by({ case (msg, user) => msg.trackId })
      .innerJoin(trackMetadatas.by(_.trackId))
      .values()
      .map({ case ((msg, user), meta) => new ArtistCountryKey(meta.artist, user.country)})
      .count()
      .map((k: ArtistCountryKey, v: Long) => new ArtistCountryStreams(k, v))
}
        

Scrunch: summary

Aside: tuples vs real types


DEMO: what types can do for you

Pig
(imperative-declarative tuple model)

Pig: code

TRACK_PLAYED_MESSAGES = LOAD '/tpm' AS
                        (m_track_id, m_user_name, m_timestamp);
            USER_INFO = LOAD '/user_info' AS
                        (u_user_name, u_country, u_subscription_level);
       TRACK_METADATA = LOAD '/track_metadata' AS
                        (t_track_id, t_title, t_artist);
         MESSAGE_USER = JOIN TRACK_PLAYED_MESSAGES BY m_user_name,
                             USER_INFO BY u_user_name;
   MESSAGE_USER_TRACK = JOIN MESSAGE_USER BY m_track_id,
                             TRACK_METADATA by t_track_id;
              GROUPED = GROUP MESSAGE_USER_TRACK BY t_artist,
                                                    u_country;
               OUTPUT = FOREACH GROUPED GENERATE $0.t_artist,
                                                 $0.u_country,
                                                 COUNT(MESSAGE_USER_TRACK);
        STORE OUTPUT INTO '/output';

Pig: summary

Hive
(declarative model)

Hive: code

CREATE TABLE track_played_message
   (username STRING, track_id INT, timestamp INT) ...
CREATE TABLE user_info
   (username STRING, is_premium BOOLEAN, country STRING) ...
CREATE TABLE track_metadata
   (track_id INT, title STRING, artist STRING) ...
CREATE TABLE artist_country_streams
   (artist STRING, country STRING, streams INT) ...

INSERT OVERWRITE TABLE artist_country_streams
SELECT t.artist, u.country, count(*)
FROM track_played_message msg
JOIN user_info u on msg.username = u.username
JOIN track_metadata t on msg.track_id = t.track_id
GROUP BY t.artist, u.country

Hive: summary

There is no magic bullet

Questions, comments, abuse

We're hiring

Data Engineers, Hadoop Experts, Software Developers
NYC and Stockholm

www.spotify.com/jobs