Question: Hadoop: Genomic Segments And Join
gravatar for Pierre Lindenbaum
9.4 years ago by
France/Nantes/Institut du Thorax - INSERM UMR1087
Pierre Lindenbaum131k wrote:

It's a question I've asked two years ago on SO: but I wonder if there could have now a new solution from some skilled users.

Say I have two BED files in a Hadoop Distributed File System. How should the MapReduce algorithm be designed to find the segments of the first collection overlapping those of the 2nd set ?

merge genomics • 3.6k views
ADD COMMENTlink written 9.4 years ago by Pierre Lindenbaum131k
gravatar for Brad Chapman
9.4 years ago by
Brad Chapman9.5k
Boston, MA
Brad Chapman9.5k wrote:

Here is a solution using cascalog, which provides a high level query interface on top of Hadoop fully integrated with the Clojure programming language. By abstracting away the map/reduce details, it eases implementation and understandability of the code. You can grab the full code example from GitHub.

First define the query. This is two functions: testing when one start end coordinate overlaps with a second and writing the cascalog query that uses this to return only those chromosome start end pairs in the first that overlap with the second:

(deffilterop overlaps [s1 e1 s2 e2]
  "Pass filter if s1,e1 start end pair overlaps s2,e2 pair."
  (or (and (>= s1 s2) (< s1 e2))
      (and (>= e1 s2) (< e1 e2))))

(defn run-bed-overlap [bed-one bed-two]
  "Cascalog query to pull data from two bed files and intersect."
  (?<- (stdout) [?chr ?s1 ?e1]
       (bed-one ?chr ?s1 ?e1)
       (bed-two ?chr ?s2 ?e2) ; Matches chromosome with bed-one
       (overlaps ?s1 ?e1 ?s2 ?e2)))

The other part is writing the plumbing to parse BED files and feed them into the run-bed-overlap function:

(defmapop parse-bed-line [line]
  (take 3 (split line #"\t")))

(defn to-int [x] (Integer/parseInt x))

(defn bed-from-hfs [dir]
  "With a BED file from HDFS, produce chromsome, start, end."
  (let [source (hfs-textline dir)]
    (<- [?chr ?s ?e]
        (source ?line)
        (parse-bed-line ?line :> ?chr ?s-str ?e-str)
        (to-int ?s-str :> ?s)
        (to-int ?e-str :> ?e))))

(defn bed-hfs-overlap [dir1 dir2]
  (run-bed-overlap (bed-from-hfs dir1) (bed-from-hfs dir2)))

Push your bed files to HDFS and run on Hadoop with:

% cd /path/to/bed-hadoop
% lein deps
% lein uberjar
% hadoop fs -mkdir /tmp/bed-hadoop/bed-1
% hadoop fs -mkdir /tmp/bed-hadoop/bed-2
% hadoop fs -put test/one.bed /tmp/bed-hadoop/bed-1
% hadoop fs -put test/two.bed /tmp/bed-hadoop/bed-2
% hadoop jar bed-hadoop-0.0.1-SNAPSHOT-standalone.jar
             bed_hadoop.core /tmp/bed-hadoop/bed-1 /tmp/bed-hadoop/bed-2
chr1 20 30
chr2 40 50
ADD COMMENTlink written 9.4 years ago by Brad Chapman9.5k

i don't know how anyone writes this stuff (Clojure)

ADD REPLYlink written 7.0 years ago by Jeremy Leipzig19k
gravatar for Pablo
9.4 years ago by
Pablo1.9k wrote:

Here are a few quick thoughts (I haven't implemented this, so apologies in advanced if the explanation is messy):

Map : Produce a tuple <"chr:start" , "end, collectionNumber" >

Partitioner: Divide by chromosome? (probably not the most efficient)

Sort: Tuples are sorted by key (i.e. "chr:start")

Reduce Step : You'll have to cheat the "MR" model and keep in memory all entries where "end" is less than current "start". The rest of the algorithm should be standard.

I hope this helps.

ADD COMMENTlink written 9.4 years ago by Pablo1.9k

Yes, that's the standard way Hadoop works (sorts before reducing). You can change the sorting method, but it's just easier to use a key like: String.format("%s:%08d", chr, start)

ADD REPLYlink written 9.4 years ago by Pablo1.9k

thanks Pablo. Does it mean that, for the reduce step, the keys will be sorted ?

ADD REPLYlink written 9.4 years ago by Pierre Lindenbaum131k

Many thanks Pablo.

ADD REPLYlink written 9.4 years ago by Pierre Lindenbaum131k
gravatar for Helwr
9.4 years ago by
Helwr20 wrote:

Use can use Hadoop streaming and Unix join command:

basically you can use hadoop as a distributed shell without writing any Java code

first you load the files into dfs, then run the streaming job with unix cat command as a mapper and pipe the output into xargs sort/uniq/join and may be some awk for formatting, this way the join will be done on the hdfs client box where you run the command (for large files that single box would the IO bottleneck)

alternatively you can do join in parallel on multiple boxes, by putting it inside of script and assigning as an argument to the streaming jar, so that each mapper will do partial join on multiple nodes, the identity reducer will aggregate the output into a single file.

also see and

ADD COMMENTlink modified 13 months ago by RamRS30k • written 9.4 years ago by Helwr20
Please log in to add an answer.


Use of this site constitutes acceptance of our User Agreement and Privacy Policy.
Powered by Biostar version 2.3.0
Traffic: 1386 users visited in the last hour