summaryrefslogtreecommitdiff
path: root/subversion/bindings/swig/python/tests/mergeinfo.py
blob: 873fc52be7fe84f7e1e702771f186d726e9b3427 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
#
import unittest, os, sys, gc
from svn import core, repos, fs
import utils

class RevRange:
  """ Proxy object for a revision range, used for comparison. """

  def __init__(self, start, end):
    self.start = start
    self.end = end

def get_svn_merge_range_t_objects():
  """Returns a list 'svn_merge_range_t' objects being tracked by the
     garbage collector, used for detecting memory leaks."""
  return [
    o for o in gc.get_objects()
      if getattr(o, '__class__', None) is not None and
        o.__class__.__name__ == 'svn_merge_range_t'
  ]

class SubversionMergeinfoTestCase(unittest.TestCase):
  """Test cases for mergeinfo"""

  # Some textual mergeinfo.
  TEXT_MERGEINFO1 = b"/trunk:3-9,27,42*"
  TEXT_MERGEINFO2 = b"/trunk:27-29,41-43*"

  # Meta data used in conjunction with this mergeinfo.
  MERGEINFO_SRC = b"/trunk"
  MERGEINFO_NBR_REV_RANGES = 3

  def setUp(self):
    """Load the mergeinfo-full Subversion repository.  This dumpfile is
       created by dumping the repository generated for command line log
       tests 16.  If it needs to be updated (mergeinfo format changes, for
       example), we can go there to get a new version."""
    self.temper = utils.Temper()
    (self.repos, _, _) = self.temper.alloc_known_repo('data/mergeinfo.dump',
                                                      suffix='-mergeinfo')
    self.fs = repos.fs(self.repos)
    self.rev = fs.youngest_rev(self.fs)

  def tearDown(self):
    del self.fs
    del self.repos
    self.temper.cleanup()

  def test_mergeinfo_parse(self):
    """Test svn_mergeinfo_parse()"""
    mergeinfo = core.svn_mergeinfo_parse(self.TEXT_MERGEINFO1)
    self.inspect_mergeinfo_dict(mergeinfo, self.MERGEINFO_SRC,
                                self.MERGEINFO_NBR_REV_RANGES)

  def test_rangelist_merge(self):
    mergeinfo1 = core.svn_mergeinfo_parse(self.TEXT_MERGEINFO1)
    mergeinfo2 = core.svn_mergeinfo_parse(self.TEXT_MERGEINFO2)
    rangelist1 = mergeinfo1.get(self.MERGEINFO_SRC)
    rangelist2 = mergeinfo2.get(self.MERGEINFO_SRC)
    rangelist3 = core.svn_rangelist_merge(rangelist1, rangelist2)
    self.inspect_rangelist_tuple(rangelist3, 3)

  def test_mergeinfo_merge(self):
    """Test svn_mergeinfo_merge()"""
    mergeinfo1 = core.svn_mergeinfo_parse(self.TEXT_MERGEINFO1)
    mergeinfo2 = core.svn_mergeinfo_parse(self.TEXT_MERGEINFO2)
    mergeinfo3 = core.svn_mergeinfo_merge(mergeinfo1, mergeinfo2)
    self.inspect_mergeinfo_dict(mergeinfo3, self.MERGEINFO_SRC,
                                self.MERGEINFO_NBR_REV_RANGES)

  def test_rangelist_reverse(self):
    mergeinfo = core.svn_mergeinfo_parse(self.TEXT_MERGEINFO1)
    rangelist = mergeinfo.get(self.MERGEINFO_SRC)
    reversed_rl = core.svn_rangelist_reverse(rangelist)
    expected_ranges = ((42, 41), (27, 26), (9, 2))
    for i in range(0, len(reversed_rl)):
      self.assertEqual(reversed_rl[i].start, expected_ranges[i][0],
                        "Unexpected range start: %d" % reversed_rl[i].start)
      self.assertEqual(reversed_rl[i].end, expected_ranges[i][1],
                        "Unexpected range end: %d" % reversed_rl[i].end)

  def test_mergeinfo_sort(self):
    mergeinfo = core.svn_mergeinfo_parse(self.TEXT_MERGEINFO1)

    # Swap the order of two revision ranges to misorder the contents
    # of a rangelist.
    rangelist = mergeinfo.get(self.MERGEINFO_SRC)
    rev_range = rangelist[0]
    rangelist[0] = rangelist[1]
    rangelist[1] = rev_range

    mergeinfo = core.svn_mergeinfo_sort(mergeinfo)
    self.inspect_mergeinfo_dict(mergeinfo, self.MERGEINFO_SRC,
                                self.MERGEINFO_NBR_REV_RANGES)

  def test_mergeinfo_get(self):
    mergeinfo = repos.fs_get_mergeinfo(self.repos, [b'/trunk'], self.rev,
                                       core.svn_mergeinfo_inherited,
                                       False, None, None)
    expected_mergeinfo = \
      { b'/trunk' :
          { b'/branches/a' : [RevRange(2, 11)],
            b'/branches/b' : [RevRange(9, 13)],
            b'/branches/c' : [RevRange(2, 16)],
            b'/trunk'      : [RevRange(1, 9)],  },
      }
    self.compare_mergeinfo_catalogs(mergeinfo, expected_mergeinfo)

  def test_mergeinfo_leakage__incorrect_range_t_refcounts(self):
    """Ensure that the ref counts on svn_merge_range_t objects returned by
       svn_mergeinfo_parse() are correct."""
    # When reference counting is working properly, each svn_merge_range_t in
    # the returned mergeinfo will have a ref count of 1...
    mergeinfo = core.svn_mergeinfo_parse(self.TEXT_MERGEINFO1)
    for (path, rangelist) in core._as_list(mergeinfo.items()):
      # ....and now 2 (incref during iteration of rangelist)

      for (i, r) in enumerate(rangelist):
        # ....and now 3 (incref during iteration of each range object)

        refcount = sys.getrefcount(r)
        # ....and finally, 4 (getrefcount() also increfs)
        expected = 4

        # Note: if path and index are not '/trunk' and 0 respectively, then
        # only some of the range objects are leaking, which is, as far as
        # leaks go, even more impressive.
        self.assertEqual(refcount, expected, (
          "Memory leak!  Expected a ref count of %d for svn_merge_range_t "
          "object, but got %d instead (path: %s, index: %d).  Probable "
          "cause: incorrect Py_INCREF/Py_DECREF usage in libsvn_swig_py/"
          "swigutil_py.c." % (expected, refcount, path, i)))

    del mergeinfo
    gc.collect()

  def test_mergeinfo_leakage__lingering_range_t_objects_after_del(self):
    """Ensure that there are no svn_merge_range_t objects being tracked by
       the garbage collector after we explicitly `del` the results returned
       by svn_mergeinfo_parse().  We call gc.collect() to force an explicit
       garbage collection cycle after the `del`;
       if our reference counts are correct, the allocated svn_merge_range_t
       objects will be garbage collected and thus, not appear in the list of
       objects returned by gc.get_objects()."""
    mergeinfo = core.svn_mergeinfo_parse(self.TEXT_MERGEINFO1)
    del mergeinfo
    gc.collect()
    lingering = get_svn_merge_range_t_objects()
    self.assertEqual(lingering, list(), (
      "Memory leak!  Found lingering svn_merge_range_t objects left over from "
      "our call to svn_mergeinfo_parse(), even though we explicitly deleted "
      "the returned mergeinfo object.  Probable cause: incorrect Py_INCREF/"
      "Py_DECREF usage in libsvn_swig_py/swigutil_py.c.  Lingering objects:\n"
      "%s" % lingering))

  def inspect_mergeinfo_dict(self, mergeinfo, merge_source, nbr_rev_ranges):
    rangelist = mergeinfo.get(merge_source)
    self.inspect_rangelist_tuple(rangelist, nbr_rev_ranges)

  def inspect_rangelist_tuple(self, rangelist, nbr_rev_ranges):
    self.assertTrue(rangelist is not None,
                 "Rangelist for '%s' not parsed" % self.MERGEINFO_SRC)
    self.assertEqual(len(rangelist), nbr_rev_ranges,
                      "Wrong number of revision ranges parsed")
    self.assertEqual(rangelist[0].inheritable, True,
                      "Unexpected revision range 'non-inheritable' flag: %s" %
                      rangelist[0].inheritable)
    self.assertEqual(rangelist[1].start, 26,
                      "Unexpected revision range end: %d" % rangelist[1].start)
    self.assertEqual(rangelist[2].inheritable, False,
                      "Missing revision range 'non-inheritable' flag")

  def compare_mergeinfo_catalogs(self, catalog1, catalog2):
    keys1 = sorted(catalog1.keys())
    keys2 = sorted(catalog2.keys())
    self.assertEqual(keys1, keys2)

    for k in catalog1:
        self.compare_mergeinfos(catalog1[k], catalog2[k])

  def compare_mergeinfos(self, mergeinfo1, mergeinfo2):
    keys1 = sorted(mergeinfo1.keys())
    keys2 = sorted(mergeinfo2.keys())
    self.assertEqual(keys1, keys2)

    for k in mergeinfo1:
        self.compare_rangelists(mergeinfo1[k], mergeinfo2[k])

  def compare_rangelists(self, rangelist1, rangelist2):
    self.assertEqual(len(rangelist1), len(rangelist2))
    for r1, r2 in zip(rangelist1, rangelist2):
        self.assertEqual(r1.start, r2.start)
        self.assertEqual(r1.end, r2.end)


def suite():
    return unittest.defaultTestLoader.loadTestsFromTestCase(
      SubversionMergeinfoTestCase)

if __name__ == '__main__':
    runner = unittest.TextTestRunner()
    runner.run(suite())