遠藤と申します。

以下のプログラムがやたら遅いです。また、メモリを大量に消費します。

$ time ./ruby -ve '
> a = [0] * 10000
> (1 .. 10000).map { a.slice!(0, 1) }
> '
ruby 1.9.0 (2008-03-07 revision 15720) [i686-linux]

real    0m10.940s
user    0m4.200s
sys     0m6.710s


配列の共有が原因のようです。a.slice!(i, n) は Ruby で書くと

  t = a[i, n]
  a[i, n] = []
  return t

という風に実装されていて、

1. t = a[i, n] が a と t を共有状態にする
2. a[i, n] = [] が a を rb_ary_modify する
3. rb_ary_modify の中で a 全体を MEMCPY する

という風に実行されます。これによって、

- 3 の MEMCPY が毎回 O(a.size) まるまるかかって遅い
- t が a.size バイトを無駄に確保したままになり (n で十分) 、メモリを圧迫する

というようになっています。


本質的な解決ではないですが、slice! に特化した 2 つの最適化をしてみました。

- rb_ary_splice で rb_ary_modify するとき、必要なところだけ MEMCPY する
- a.slice!(i, n) で n が a.size / 2 より小さい場合は t を共有せずに作る

ベンチマークです。単位は秒。

+----------+-------------+-----------------------------------------------+
| 最適化前 |  最適化後   |                                               |
| user sys | user  sys   | a = [0] * 10000; b = (1 .. 10000).map { ... } |
+----------+-------------+-----------------------------------------------+
| 4.2  6.7 | 0.14  0.03  | a.slice!(0, 1)                                |
| 0.26 6.6 | 0.050 0.030 | t, a = a, a.slice!(0, a.size - 1); t          |
+----------+-------------+-----------------------------------------------+


なお、この最適化を入れても、以下のコードではやっぱり遅いです。
これを何とかするのはいろいろと面倒そうです。

a = [0] * 10000
(1 .. 10000).map { t = a[0, 1]; a[0, 1] = []; t }


Index: array.c
===================================================================
--- array.c	(revision 15726)
+++ array.c	(working copy)
@@ -647,6 +647,26 @@
 }

 VALUE
+rb_ary_subseq_copy(VALUE ary, long beg, long len)
+{
+    VALUE klass, ary2;
+
+    if (beg > RARRAY_LEN(ary)) return Qnil;
+    if (beg < 0 || len < 0) return Qnil;
+
+    if (RARRAY_LEN(ary) < len || RARRAY_LEN(ary) < beg + len) {
+	len = RARRAY_LEN(ary) - beg;
+    }
+    klass = rb_obj_class(ary);
+    ary2 = ary_new(klass, len);
+    if (len > 0) {
+	MEMCPY(RARRAY_PTR(ary2), RARRAY_PTR(ary) + beg, VALUE, len);
+	RARRAY(ary2)->len = len;
+    }
+    return ary2;
+}
+
+VALUE
 rb_ary_subseq(VALUE ary, long beg, long len)
 {
     VALUE klass, ary2, shared;
@@ -973,8 +993,8 @@
 	rpl = rb_ary_to_ary(rpl);
 	rlen = RARRAY_LEN(rpl);
     }
-    rb_ary_modify(ary);
     if (beg >= RARRAY_LEN(ary)) {
+	rb_ary_modify(ary);
 	len = beg + rlen;
 	if (len >= ARY_CAPA(ary)) {
 	    RESIZE_CAPA(ary, len);
@@ -993,15 +1013,30 @@
 	}

 	alen = RARRAY_LEN(ary) + rlen - len;
-	if (alen >= ARY_CAPA(ary)) {
-	    RESIZE_CAPA(ary, alen);
+
+	rb_ary_modify_check(ary);
+	if (ARY_SHARED_P(ary)) {
+	    VALUE *ptr;
+	    ptr = ALLOC_N(VALUE, alen);
+	    FL_UNSET(ary, ELTS_SHARED);
+	    MEMCPY(ptr, RARRAY_PTR(ary), VALUE, beg);
+	    MEMCPY(ptr + beg + rlen, RARRAY_PTR(ary) + beg + len,
+		   VALUE, RARRAY_LEN(ary) - (beg + len));
+	    RARRAY(ary)->aux.capa = RARRAY(ary)->len = alen;
+	    RARRAY(ary)->ptr = ptr;
 	}
+	else {
+	    if (alen >= ARY_CAPA(ary)) {
+		RESIZE_CAPA(ary, alen);
+	    }
+	    if (len != rlen) {
+		MEMMOVE(RARRAY_PTR(ary) + beg + rlen,
+			RARRAY_PTR(ary) + beg + len,
+			VALUE, RARRAY_LEN(ary) - (beg + len));
+		RARRAY(ary)->len = alen;
+	    }
+	}

-	if (len != rlen) {
-	    MEMMOVE(RARRAY_PTR(ary) + beg + rlen, RARRAY_PTR(ary) + beg + len,
-		    VALUE, RARRAY_LEN(ary) - (beg + len));
-	    RARRAY(ary)->len = alen;
-	}
 	if (rlen > 0) {
 	    MEMMOVE(RARRAY_PTR(ary) + beg, RARRAY_PTR(rpl), VALUE, rlen);
 	}
@@ -1790,7 +1825,12 @@
 	    pos = RARRAY_LEN(ary) + pos;
 	    if (pos < 0) return Qnil;
 	}
-	arg2 = rb_ary_subseq(ary, pos, len);
+	if (len * 2 < RARRAY_LEN(ary)) {
+	    arg2 = rb_ary_subseq_copy(ary, pos, len);
+	}
+	else {
+	    arg2 = rb_ary_subseq(ary, pos, len);
+	}
 	rb_ary_splice(ary, pos, len, Qundef);	/* Qnil/rb_ary_new2(0) */
 	return arg2;
     }

-- 
Yusuke ENDOH <mame / tsg.ne.jp>