Joachim Breitner

On taking the last n elements of a list

Published 2013-06-22 in sections English, Haskell.

When you are programming in Haskell, one of the most commonly data structures is the list. As these are implemented as singly-linked lists, you should usually only work with the beginning of the list, adding and removing elements there, and traversing the list from left to right (either lazily with a right fold, or with an accumulator and a strict left fold). Hence it is no surprise that the standard library comes with take (which takes the first n elements of the list), but no takeR (which would return the last n elements of the list).

Naive code

But even in properly written Haskell code, you sometimes have the need for a takeR function. And of course it is easily implemented as

takeRNaive :: Int -> [a] -> [a]
takeRNaive n = reverse . take n . reverse 

which works just fine, but does not have nice performance properties: It will evaluate the argument list completely, creating a separate list, before returning the first bit of the result, and hence keeps all elements in memory. This is not ok if you are deliberately relying on lazy lists for good performance.

As criterion tells me, using this function to take 100 000 elements of a 100 000 list (which is [1..100000], so the list generation is really fast) takes 9.9ms; returning just the last 100 elements takes 4.2ms.

The imperative code

At first I could not think of an obvious functional way to solve the problem. But that is not a problem, after all you can use your imperative thinking in Haskell as well, and thanks to the ST monad, even include imperative algorithms with destructive updates in pure code in a safe manner.

The obvious imperative algorithm is a buffer, sized to store the number of elements requested, that is filled in a cyclic manner until the input list is full, and then the result list can be reconstructed from it. The code is naturally a bit larger:

takeRArray :: forall a. Int -> [a] -> [a]
takeRArray n l | n <= 0 = []
takeRArray n l = runST stAction
  where
    stAction :: forall s. ST s [a]
    stAction = do
        buffer <- newArray_ (0, n-1)
        i <- go (buffer :: STArray s Int a) 0 l
        let s = min i n
        sequence $ [ readArray buffer (j `mod` n) | j <- [i-s..i-1] ]
    go buffer i [] = return i
    go buffer i (x:xs) = writeArray buffer (i `mod` n) x >> go buffer (i+1) xs

The stAction is the imperative part that modifies a buffer; go is the inner loop that keeps track of the current position and fills the values of the list there. I use Data.Array.MArray here, using Data.Vector.Mutable shows similar performance behaviour.

This code has some non-trivial indexing, so is likely to contain bugs. Luckily we can use QuickCheck to quickly check that takeRArray indeed implements takeRNaive:

quickCheck (\n l -> n <= 100000 ==> takeRNaive n l == takeRArray n (l::[Int]))

The code is slower for a large result list (11ms for 100000/100000) but noticeable faster for a small result and large input (1.7ms for 100/100000). The latter is likely due to takeRArray being able to release the parts of the input list that it knows will not end up in the result.

But why is the former slower? When it comes to performance, a glance a the core code (ghc -ddump-simpl) is always enlightening. There, we find that the code for `mod` will check if the divisor is -1 or 0 in every step of the inner loop. It seems that GHC is neither smart enough to see that we excluded this case by our n <= 0 pattern, nor that he can do this check once, outside the loop. We can help him by adding two additional pattern matches to the function definition:

takeRArray (-1) l = []
takeRArray 0 l = []

Now we beat the naive implementation also for the large result (9.3ms) and also improve the speed in the other test case (1.6ms).

Can we make our code more Haskellish, at least in its behaviour? Just like tail xs will share its memory representation with xs, it should be possible that takeR n xs will be shared with some suffix of xs. To achieve that we have to put complete suffixes of the input list in our buffer store, instead of just elements, and then we can simply return the suffix from the right position in the buffer:

takeRArray2 :: forall a. Int -> [a] -> [a]
takeRArray2 n l | n <= 0 = []
takeRArray2 (-1) l = []
takeRArray2 0 l = []
takeRArray2 n l = runST stAction
  where
    stAction :: forall s. ST s [a]
    stAction = do
        buffer <- newArray_ (0, n)
        i <- go (buffer :: STArray s Int [a]) 0 l
        let s = min i n
        if s <= 0 then return [] else readArray buffer ((i-s) `mod` n)
    go buffer i [] = return i
    go buffer i l@(x:xs) = writeArray buffer (i `mod` n) l >> go buffer (i+1) xs

The list in each cell will have its tail in the next cell, so this does keep our memory consumption linear in the size of the buffer (and not quadratic, as it would be the case in languages without sharing). The speed up in the case of taking 100000 of 100000 elements is great: We are at 2.4ms, ¼ of the original runtime! In the other case, the gain is hardly measurable: After rounding, we are still at 1.6ms.

So is this the function we want to use from now? Unfortunately not: For larger lists we start to get stack overflow errors. (Strangely only when I do the benchmarking with criterion; I cannot reproduce this with GHCi.) Anyways, the code is everything but nice or short. So surely there must be a better way.

The idiomatic code

And there is, although it wasn’t obvious to me at first. The trick is to have one recursive function which traverses two lists in parallel: The input list, and the input list fast-forwarded by n elements. It keeps taking elements off both (and discarding them) until the shorter is empty. Then the other has to contain the last n elements of the input list, and we return it:

takeRIdiomatic :: Int -> [a] -> [a]
takeRIdiomatic n l = go (drop n l) l
  where
    go [] r = r
    go (_:xs) (_:ys) = go xs ys

This is again nice Haskell code: Short, no index calculations (actually, no calculation at all, if you don’t count what happens inside drop), no special handling of bad input required (as it turns out to do just the right thing for negative n), nice sharing behaviour. And it even outperforms all of the variants above: 0.9ms for the larger result and 0.4ms for the smaller. This is a function I can now happily recommend to use for this use case.

Run time analysis

But how exactly does this work? Let us trace its behaviour with ASCII-Art, demonstrating a call to takeRIdiomatic 4 [1..7]:

Initially, the argument is unevaluated (represented by [1..7]):

[1..7]
└ takeRIdiomatic 4

Now we evaluate the function. This will call go with two arguments (for nicer drawing, the first argument is the pointer on the right):

[1..7]
├──── drop 4
└ go ─┘

As go pattern-matches on its first parameter, we need to evaluate drop. Drop does not return anything until it has dropped the required number of arguments, so we evaluate three elements of the input list:

1:2:3:4:[5..7]
│        │
└── go ──┘

Now go can do its thing: Pattern match on both arguments (hence further evaluating the list) and “advancing the pointers”. Note how the first element of the list can be freed by the garbage collector, and how no further arithmetic is needed:

1:2:3:4:5:[6..7]  1:2:3:4:5:6:[6..7]  1:2:3:4:5:6:7:[]
  └─ go ──┘           └─ go ──┘             └─ go ──┘

Now go has reached the end of the list, and simply returns the first argument, which indeed contains the last 4 elements of original list, and even shares them with it:

1:2:3:4:5:6:[]
    ↑

Conclusion

Imperative programming is possible in Haskell, but it pays off to think about idiomatic code for a moment.

Update: A German version of this text has appeared on the blog funktionale-programmierung.de.

Comments

Hi,



I thought formalizing takeR's correctness would be a good exercise in Agda :

https://github.com/goerch/agda-stuff/blob/master/takeR.agda



The only problem I saw was a missing case 'takeRIdiomatic-go (x ∷ xs) [] = []'



All the best,

Joerg
#1 goerch am 2013-07-15
The case is not necessary (in Haskell, as it will never occur, as the first argument is a sublist of the second).
#2 Joachim Breitner (Homepage) am 2013-07-15
here's another way to express it that should be better than your first example, but probably worse than the rest.



takeRSilly n l = drop (length l - n) l



The really nice thing about takeRIdiomatic is that it's perfectly set up for tail-call optimisation, meaning that it operates in constant stack space, which is a nice property for a function that could be working on really long lists.
#3 Thomas am 2013-07-16
Without explicit recursion:



takeR n l = foldr (const(.tail)) id (drop n l) l



or even



import Control.Monad.Reader

takeR = join . (foldr (const(.tail)) id.) . drop





ghc -O2 turns those into your last version.
#4 Timon Gehr am 2013-08-17
Very nice solution!



FWIW there is a slightly less naive version than your naive version which avoids creating a new list, has nice sharing behavior and better performance than the very naive version. It may also be a better "bridge" for getting to your optimal version.

<pre>

takeR :: Int -> [a] -> [a]

takeR n l = drop (length l - n) l

</pre>
#5 George Colpitts am 2013-07-27

Have something to say? You can post a comment by sending an e-Mail to me at <mail@joachim-breitner.de>, and I will include it here.