Scalding the Crunchy Pig for Cascading into the Hive

aka. What's the best tool for writing Hadoop jobs?


David Whiting, Sept 2013

There is no magic bullet

What's wrong with plain ol' MapReduce?

What features can we have?

The Cascading family
(tuple pipeline model)

Concepts

"Build a Cascade from Flows which connect Taps via Pipes built into Assemblies to process Tuples"

Pipes

The example pipeline

Inputs

Output

Programming with Cascading: Constructing the flow

public class CascadingStreamExample extends SubAssembly {

    private Fields trackPlayedMessageFields =
        new Fields("tpm.track_id", "tpm.user_name", "tpm.timestamp");
    private Fields userInfoFields =
        new Fields("u.user_name", "u.country", "u.subscription_level");
    private Fields trackMetadata =
        new Fields("t.track_id", "t.title", "t.artist");

    public CascadingStreamExample(Pipe trackPlayedMessage, Pipe userInfo, Pipe trackMetadata) {
        Pipe tpmUser = new CoGroup(trackPlayedMessage,
                                   new Fields("tpm.user_name"),
                                   userInfo,
                                   new Fields("u.user_name"),
                                   new InnerJoin());

        Pipe tpmUserTrack = new CoGroup(tpmUser,
                                        new Fields("tpm.track_id"),
                                        trackMetadata,
                                        new Fields("t.track_id"));

        Pipe canonicalise = new Each(tpmUserTrack,
                                     new Fields("t.artist"),
                                     new CanonicalArtist(),
                                     Fields.REPLACE);

        Pipe aggregate = new AggregateBy(canonicalise,
                                         new Fields("canonical_artist", "u.country"),
                                         new CountBy(new Fields("count")));

        Pipe cut = new Retain(aggregate, new Fields("canonical_artist", "u.country", "count"));
        Pipe rename = new Rename(cut, Fields.ALL, new Fields("artist", "country", "streams"));
        this.setTails(rename);
    }
}
            

Programming with Cascading: custom functions

public class CanonicalArtist extends BaseOperation<Object> implements Function<Object> {
    
    public CanonicalArtist() {
        super(1, new Fields("canonical_artist"));
    }

    @Override
    public void operate(FlowProcess flowProcess, FunctionCall<Object> functionCall) {
        String artist = functionCall.getArguments().getString(0);
        String canonicalArtist = artist.toLowerCase();
        functionCall.getOutputCollector().add(new Tuple(canonicalArtist));
    }
}
          

Cascading: Building and testing

Cascading: summary

Scalding

Scalding: code

class ArtistCountryStreams(args: Args) extends Job(args) {
    val trackPlayedMessageSchema = ('tpmTrackId, 'tpmUserName, 'tpmTimestamp)
    val userInfoSchema = ('uUserName, 'uCountry, 'uSubscriptionLevel)
    val trackMetadataSchema = ('tTrackId, 'tTitle, 'tArtist)
    SomeSource(args("tpm"), trackPlayedMessageSchema)
        .joinWithSmaller('tpmUserName -> 'uUserName,
                         SomeSource("userInfo", userinfoSchema))
        .joinWithSmaller('tpmTrackId -> 'tTrackId,
                         SomeSource("trackMetadata", trackMetadataSchema))
        .map('tArtist -> 'canonArtist) { tArtist: String => tArtist.toLowerCase }
        .project('canonArtist, 'uCountry)
        .groupBy('canonArtist, 'uCountry) { _.size('streams) }
        .rename(('canonArtist, 'uCountry) -> ('artist, 'country))
}
          

Scalding: summary

The Crunch family
(typed functional model)

Crunch: Concepts

Works with real types with a strategy based on lazy collections

Crunch: code

public PCollection<ArtistCountryStreams> computeStreams(
            PCollection<TrackPlayedMessage> trackPlayedMessages,
            PCollection<UserInfo> userInfos,
            PCollection<TrackMetadata> trackMetadatas)
{
    PCollection<Pair<TrackPlayedMessage, UserInfo>> messageUser =
        Join.innerJoin(
            trackPlayedMessages.by(new TrackPlayedMessage.Username(), Avros.strings()),
            userInfos.by(new UserInfo.Username(), Avros.strings()))
        .values();

    PCollection<ArtistCountryStreams> artistCountryStreams =
        Join.innerJoin(
                messageUser.by(mapFirst(new TrackPlayedMessage.TrackId(), UserInfo.class),
                               Avros.ints()),
                trackMetadatas.by(new TrackMetadata.TrackId(), Avros.ints())
            )
            .values()
            .parallelDo(new CreateArtistCountryStreamsKey(),
                        Avros.reflects(ArtistCountryStreams.Key.class))
            .count()
            .parallelDo(new ArtistCountryStreams.Create(),
                        Avros.reflects(ArtistCountryStreams.class));

    return artistCountryStreams;
}
        

Crunch: type definition

public static class TrackPlayedMessage {
    public final String username;
    public final int trackId;
    public final long timestamp;
    public TrackPlayedMessage() {
        username = null;
        trackId = 0;
        timestamp = 0;
    }
    public TrackPlayedMessage(String username, int trackId, long timestamp) {           
        this.username = username;
        this.trackId = trackId;
        this.timestamp = timestamp;
    }
    public static class Username extends MapFn<TrackPlayedMessage, String> {
        @Override
        public String map(TrackPlayedMessage input) {
            return input.username;
        }
    }
    public static class TrackId extends MapFn<TrackPlayedMessage, Integer> {
        @Override
        public Integer map(TrackPlayedMessage input) {
            return input.trackId;
        }
    }
}
          

Crunch: testing

private PCollection<TrackPlayedMessage> messages = MemPipeline.typedCollectionOf(
        Avros.reflects(TrackPlayedMessage.class),
        new TrackPlayedMessage("adam", 1001, 200000L),
        new TrackPlayedMessage("brian", 1001, 200001L),
        new TrackPlayedMessage("charlie", 1002, 200002L),
        new TrackPlayedMessage("dave", 1003, 200003L),
        new TrackPlayedMessage("dave", 1004, 200004L)
);

private PCollection<TrackMetadata> trackMeta = MemPipeline.typedCollectionOf(
        Avros.reflects(TrackMetadata.class),
        new TrackMetadata(1001, "track1", "artistA"),
        new TrackMetadata(1002, "track2", "artistA"),
        new TrackMetadata(1003, "track1", "artistB"),
        new TrackMetadata(1004, "track2", "artistB")
);

private PCollection<UserInfo> userInfo = MemPipeline.typedCollectionOf(
        Avros.reflects(UserInfo.class),
        new UserInfo("adam", SubscriptionLevel.FREE, "PL"),
        new UserInfo("brian", SubscriptionLevel.FREE, "US"),
        new UserInfo("charlie", SubscriptionLevel.PREMIUM, "US"),
        new UserInfo("dave", SubscriptionLevel.PREMIUM, "GB")
);


@Test
public void testComputeStreams() throws Exception {
    CrunchStreamExample crunchStreamExample = new CrunchStreamExample();
    List<ArtistCountryStreams> output =
        Lists.newArrayList(
            crunchStreamExample.computeStreams(messages, userInfo, trackMeta)
            .materialize());
    List<ArtistCountryStreams> expected = Lists.newArrayList();
    assertEquals(expected, output);
}
          

Crunch: summary

Scrunch: Scala API for Crunch

Scrunch: code

object CrunchStreamExample {

  class TrackPlayedMessage(val username: String, val trackId: Int, val timestamp: Long)
  class UserInfo(val username: String, val isPremium: Boolean,  val country: String)
  class TrackMetaData(val trackId: Int, val title: String, val artist: String)
  class ArtistCountryKey(val artist: String, val country: String)
  class ArtistCountryStreams(val key: ArtistCountryKey, val streams: Long)

  def computeStreams(trackPlayedMessages: PCollection[TrackPlayedMessage],
                      userInfos: PCollection[UserInfo],
                      trackMetadatas: PCollection[TrackMetaData])
              :PCollection[ArtistCountryStreams] = {
    trackPlayedMessages.by(_.username)
      .innerJoin(userInfos.by(_.username))
      .values()
      .by(_._1.trackId)
      .innerJoin(trackMetadatas.by(_.trackId))
      .values()
      .map(v => new ArtistCountryKey(v._2.artist, v._1._2.country))
      .count()
      .map((k: ArtistCountryKey, v: Long) => new ArtistCountryStreams(k, v));
  }
}
        

Scrunch: summary

Aside: tuples vs real types

Pig
(imperative-declarative tuple model)

Pig: code

TRACK_PLAYED_MESSAGES = LOAD '/tpm' AS
                        (m_track_id, m_user_name, m_timestamp);
            USER_INFO = LOAD '/user_info' AS
                        (u_user_name, u_country, u_subscription_level);
       TRACK_METADATA = LOAD '/track_metadata' AS
                        (t_track_id, t_title, t_artist);
         MESSAGE_USER = JOIN TRACK_PLAYED_MESSAGES BY m_user_name,
                             USER_INFO BY u_user_name;
   MESSAGE_USER_TRACK = JOIN MESSAGE_USER BY m_track_id,
                             TRACK_METADATA by t_track_id;
              GROUPED = GROUP MESSAGE_USER_TRACK BY t_artist,
                                                    u_country;
               OUTPUT = FOREACH GROUPED GENERATE $0.t_artist,
                                                 $0.u_country,
                                                 COUNT(MESSAGE_USER_TRACK);
        STORE OUTPUT INTO '/output';

Pig: summary

Hive
(declarative model)

Hive: code

CREATE TABLE track_played_message
   (username STRING, track_id INT, timestamp INT) ...
CREATE TABLE user_info
   (username STRING, is_premium BOOLEAN, country STRING) ...
CREATE TABLE track_metadata
   (track_id INT, title STRING, artist STRING) ...
CREATE TABLE artist_country_streams
   (artist STRING, country STRING, streams INT) ...

INSERT OVERWRITE TABLE artist_country_streams
SELECT t.artist, u.country, count(*)
FROM track_played_message msg
JOIN user_info u on msg.username = u.username
JOIN track_metadata t on msg.track_id = t.track_id
GROUP BY t.artist, u.country

Hive: summary

Questions, comments, abuse

There is no magic bullet