Skip to content
Advertisement

Merge of lazy streams (using generators) in Python

I’m playing with functional capacities of Python 3 and I tried to implement classical algorithm for calculating Hamming numbers. That’s the numbers which have as prime factors only 2, 3 or 5. First Hamming numbers are 2, 3, 4, 5, 6, 8, 10, 12, 15, 16, 18, 20 and so on.

My implementation is the following:

def scale(s, m):
    return (x*m for x in s)

def merge(s1, s2):
    it1, it2 = iter(s1), iter(s2)
    x1, x2 = next(it1), next(it2)
    if x1 < x2:
        x = x1
        it = iter(merge(it1, s2))
    elif x1 > x2:
        x = x2
        it = iter(merge(s1, it2))
    else:
        x = x1
        it = iter(merge(it1, it2))
    yield x
    while True: yield next(it)

def integers():
    n = 0
    while True:
        n += 1
        yield n

m2 = scale(integers(), 2)
m3 = scale(integers(), 3)
m5 = scale(integers(), 5)

m23 = merge(m2, m3)

hamming_numbers = merge(m23, m5)

The problem it that merge seems just doesn’t work. Before that I implemented Sieve of Eratosthenes the same way, and it worked perfectly okay:

def sieve(s):
    it = iter(s)
    x = next(it)
    yield x
    it = iter(sieve(filter(lambda y: x % y, it)))
    while True: yield next(it)

This one uses the same techniques as my merge operation. So I can’t see any difference. Do you have any ideas?

(I know that all of these can be implemented other ways, but my goal exactly to understand generators and pure functional capabilities, including recursion, of Python, without using class declarations or special pre-built Python functions.)

UPD: For Will Ness here’s my implementation of this algorithms in LISP (Racket actually):

(define (scale str m)
  (stream-map (lambda (x) (* x m)) str))

(define (integers-from n)
  (stream-cons n
               (integers-from (+ n 1))))

(define (merge s1 s2)
  (let ((x1 (stream-first s1))
        (x2 (stream-first s2)))
    (cond ((< x1 x2)
           (stream-cons x1 (merge (stream-rest s1) s2)))
          ((> x1 x2)
           (stream-cons x2 (merge s1 (stream-rest s2))))
          (else
           (stream-cons x1 (merge (stream-rest s1) (stream-rest s2)))))))


(define integers (integers-from 1))

(define hamming-numbers
  (stream-cons 1 (merge (scale hamming-numbers 2)
                        (merge (scale hamming-numbers 3)
                               (scale hamming-numbers 5)))))

Advertisement

Answer

Your algorithm is incorrect. Your m2, m3, m5 should be scaling hamming_numbers, not integers.

The major problem is this: your merge() calls next() for both its arguments unconditionally, so both get advanced one step. So after producing the first number, e.g. 2 for the m23 generator, on the next invocation it sees its 1st argument as 4(,6,8,...) and 2nd as 6(,9,12,...). The 3 is already gone. So it always pulls both its arguments, and always returns the head of the 1st (test entry at http://ideone.com/doeX2Q).

Calling iter() is totally superfluous, it adds nothing here. When I remove it (http://ideone.com/7tk85h), the program works exactly the same and produces exactly the same (wrong) output. Normally iter() serves to create a lazy iterator object, but its arguments here are already such generators.

There’s no need to call iter() in your sieve() as well (http://ideone.com/kYh7Di). sieve() already defines a generator, and filter() in Python 3 creates an iterator from a function and an iterable (generators are iterable). See also e.g. Difference between Python’s Generators and Iterators .

We can do the merge like this, instead:

def merge(s1, s2):
  x1, x2 = next(s1), next(s2)
  while True:
    if x1 < x2:
        yield x1
        x1 = next(s1)
    elif x1 > x2:
        yield x2
        x2 = next(s2)
    else:
        yield x1
        x1, x2 = next(s1), next(s2)

Recursion in itself is non-essential in defining the sieve() function too. In fact it only serves to obscure there an enormous deficiency of that code. Any prime it produces will be tested by all the primes below it in value – but only those below its square root are truly needed. We can fix it quite easily in a non-recursive(*) style (http://ideone.com/Qaycpe):

def sieve(s):    # call as: sieve( integers_from(2))
    x = next(s)  
    yield x
    ps = sieve( integers_from(2))           # independent primes supply
    p = next(ps) 
    q = p*p       ; print((p,q))
    while True:
        x = next(s)
        while x<q: 
            yield x
            x = next(s)
        # here x == q
        s = filter(lambda y,p=p: y % p, s)  # filter creation postponed 
        p = next(ps)                        #   until square of p seen in input
        q = p*p 

(*)(well, actually, this is also recursive, but in a very different way than before)

This is now much, much, much more efficient (see also: Explain this chunk of haskell code that outputs a stream of primes ).

Recursive or not, is just a syntactic characteristic of a code. The actual run-time structures are the same – the filter() adaptors being hoisted on top of an input stream – either at the appropriate moments, or way too soon (so we’d end up with way too many of them).

User contributions licensed under: CC BY-SA
8 People found this is helpful
Advertisement