forked from czcorpus/xmlanntools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstandoff2xml
executable file
·347 lines (302 loc) · 15.6 KB
/
standoff2xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2024 Pavel Vondřička <[email protected]>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 2
# dated June, 1991.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
import argparse
from pathlib import Path
import sys
import json
import html
from operator import itemgetter
def merge_xml(text, annotation, out):
"""
Merge plain text with stand-off annotation into XML
Parameters:
-----------
text : str
original plain text
annotation : list of dicts
list of annotation spans (either from the original XML or the tagger annotation merged)
out : file object
output file to write the resulting XML into
"""
# current character position in plain text
curpos = 0
# heap of unclosed/open element spans
openels = []
# next element/span to be closed
nextclose = None
# length of the plain text
textlength = len(text)
# length of the annotation list
annlength = len(annotation)
anncnt = 0
# iterate over all annotation spans (or until everything is closed)
while anncnt < annlength or len(openels):
# is there still any annotation available or have we reached its end?
if anncnt < annlength:
i = annotation[anncnt]
anncnt += 1
else:
# use dummy span "starting" at the end of the plain text: we need to seek there if there is nothing else left
i = {'start': textlength, 'level': 0}
# got out-of-order element: fail, we cannot fix it anymore (but this should never happen if annotation is sorted!)
if i['start'] < curpos:
raise Exception("Element '{0}' has started before position {1}.".format(i, curpos))
# while seeking to the start of the current span, write out all remaining text segments and close all former spans ending before this one starts
while i['start'] >= curpos and (nextclose or len(openels)):
# fetch next open span to be closed
if not nextclose:
nextclose = openels.pop()
# plain text EOF reached before end of the last element span? (should not happen)
if textlength < nextclose['end']:
# this is result of broken annotation OR an incomplete plain text file!
raise Exception("Element '{0}' ends beyond the end of the text file (length {1}).".format(i, textlength))
# write out next remaining text segment and the end tag for the span to be closed next
if (nextclose['end'] <= i['start']) and (nextclose['level'] >= i['level']):
topos = nextclose['end']
cut = nextclose['cut'] if 'cut' in nextclose else 0
# write out remaining plain text segment
out.write(html.escape(text[curpos:topos-cut]))
# write out end tag
out.write("</{0}>".format(nextclose['name']))
if 'tail' in nextclose:
out.write(nextclose['tail'])
curpos = topos
nextclose = None
else:
break
topos = i['start']
# write out remaining plain text segment
out.write(html.escape(text[curpos:topos]))
# move current position pointer to the start of the current element/span
curpos = topos
# write out the current element's start tag (or comment or processing instruction)
if 'name' in i:
el = i['name']
if len(i["contents"]):
if el == '!--':
el += i["contents"]
else:
el += i.get("csep", ' ') + i["contents"]
elif len(i.get("csep",'')):
el += i["csep"]
out.write("<{0}>".format(el))
if 'text' in i:
out.write(i['text'])
# if this is not a zero length element: append it to the heap of open elements to be closed later
if (i['end'] != curpos or
(not i['name'].endswith('/') and not i['contents'].endswith('/')
and not (i['name'].startswith('!') or i['name'].startswith('?')))):
openels.append(i)
# also return the last fetched unclosed element back to the heap!
if nextclose:
openels.append(nextclose)
# re-sort the list of open elements by order
openels = sorted(openels, key=itemgetter('order'))
nextclose = None
# for zero length elements try to output the 'tail'
else:
if 'tail' in i:
out.write(i['tail'])
# skip characters to be skipped
if 'skip' in i:
curpos += i['skip']
def merge_annotations(origann, newann, breakable=None, allowmerge=False, no_betweenspan_elements=False):
"""
Merge original annotation with a new annotation into one stream
(for origann / newann / outann: see `xml2standoff`/`split_xml` for more details on the span description,
i.e. the dict contents)
Parameters:
-----------
origann : list of dicts
original list of spans (XML annotation, possibly already marged with sentence segmentation)
newann : list of dicts
new list of spans to be merged with the original one (tagger annotation)
breakable : list of strings or None
if this is a list, issue warning whenever a span (whose name is not in this list) is broken
by a new span (Default: None [i.e. no warnings at all])
allowmerge : bool
allow merge of spans with the same name, start and end position (Default: False)
no_betweenspan_elements : bool
do NOT restart broken elements between spans of the new annotation if they end within the following span again (usually pointless highlighting between sentences)
Returns:
--------
outann : list of dicts
merged list of spans (merged annotation)
"""
# sort current annotation spans by their order
origann = sorted(origann, key=itemgetter('order'))
# resulting merged list of annotation spans
outann = []
# virtual position in the text
xptr = 0
# heap of open spans
openspans = []
restartspans = []
order = 0
# iterate over the new annotation spans
for t in newann:
# reopen spans that started inside the previous new span and were therefore broken
for x in restartspans:
order += 1
x['order'] = order
if not no_betweenspan_elements or x['end']>=t['end']:
# really append only if it does not end within the current new span again (or if requested)
outann.append(x)
restartspans = []
inbuffer = []
lastlevel = 0
# A) first check whether some of the previously inserted spans do not end inside the new span
stillopen = []
for x in openspans:
if x['end'] <= t['start']: # forget about spans ending before the new span, they do not cause problems
continue
if x['end'] < t['end']: # span ending inside the new span must be split:
if breakable and x['name'] not in breakable:
print("Warning: Element <{0}> broken by element <{1}> at text position {2}."
.format(x['name'], t['name'], t['start']), file=sys.stderr)
xc = x.copy()
x['end'] = t['start'] # a) cut the original span end at the start of the new span
xc['start'] = t['start'] # b) the rest has to be placed inside the new span (a copy restarted inside)
inbuffer.append(xc)
elif x['end'] > t['end']: # spans that just go on... keep them for later revision
stillopen.append(x)
if x['end'] >= t['end']:
lastlevel = x['level']
openspans = stillopen
# B) now look at other spans concerning the new span (starting before or inside it)
# 1) spans starting before the new span starts
while xptr < len(origann) and origann[xptr]['start'] <= t['start']:
x = origann[xptr]
if x['end'] <= t['start']: # spans preceding the new span completely
order += 1
x['order'] = order
outann.append(x)
elif x['end'] < t['end']: # spans ending inside the new span:
if x['start'] == t['start']: # a) span to be placed completely inside the new span
inbuffer.append(x)
else: # b) span to be split:
if breakable and x['name'] not in breakable:
print("Warning: Element <{0}> broken by element <{1}> at text position {2}."
.format(x['name'], t['name'], t['start']), file=sys.stderr)
xc = x.copy()
x['end'] = t['start'] # b1) cut the original span end at the start of the current one
order += 1
x['order'] = order
outann.append(x)
xc['start'] = t['start'] # b2) the rest (copy) to be placed inside the new span
inbuffer.append(xc)
else:
if allowmerge and x['end'] == t['end'] and x['name'] == t['name']: # merge spans with the same name and length
t['contents'] = x['contents'] + t.get('csep',' ') + t['contents']
t['csep'] = x.get('csep',' ')
else:
lastlevel = x['level']
if x['end'] != t['end']: # spans not ending yet must be revised by later new spans as well
openspans.append(x)
order += 1
x['order'] = order
outann.append(x)
xptr += 1
# 2) spans starting inside the new span
while xptr < len(origann) and origann[xptr]['start'] < t['end']:
x = origann[xptr]
if x['end'] <= t['end']: # span to be placed inside the new span
inbuffer.append(x)
else: # span to be split:
if breakable and x['name'] not in breakable:
print("Warning: Element <{0}> broken by element <{1}> at text position {2}."
.format(x['name'], t['name'], t['end']), file=sys.stderr)
xc = x.copy() # a) part to be placed inside the new span
xc['end'] = t['end']
inbuffer.append(xc)
# b) rest to be cut and restarted after the new span
x['start'] = t['end']
restartspans.append(x)
openspans.append(x)
xptr += 1
# C) now append the new span itself
t['level'] = lastlevel + 1
order += 1
t['order'] = order
outann.append(t)
baselevel = 0
# D) add spans to be placed inside the new span
if len(inbuffer):
baselevel = inbuffer[0]['level']
for ispan in inbuffer:
ispan['level'] = t['level'] + 1 + (ispan['level'] - baselevel)
order += 1
ispan['order'] = order
outann.append(ispan)
return outann
if __name__ == '__main__':
"""
Re-create XML from a plain text file and its standoff annotation(s)
===================================================================
Input: input reference text file name
- Expects the file with stand-off annotation with the same base name and the extension `.json`
- If a second annotation file with the same name and extension `.ann.json` (created by `ann2standoff`)
is found, the two annotations are merged and applied both
Output: creates a new XML file with the same base name as the input file(s) and the extension `.ann.xml`
Options: `-t` to ignore sentence boundaries from the `.ann.json` annotation. (e.g. for InterCorp)
`-Wb <elements>` to issue warning whenever the annotation breaks another element than
one of the listed ones (comma-separated list of element names, e.g. `-Wb hi,stage,foreign`);
no warnings are issued if `-Wb` is omitted
`-te <element_name>` Name of the XML elemenent for tokens.
Default: 'w'.
'-se <element_name>' Name of the XML elemenent for sentences.
Default: 's'.
'-kb' Keep broken elements also between sentences (usually pointless for highlighting etc.)
"""
parser = argparse.ArgumentParser(description="Re-create XML from a plain text file and its standoff annotation(s)")
parser.add_argument("infile", help="input TXT file name")
parser.add_argument("-t", "--tokens-only", help="ignore sentence boundaries from the `.ann.json` annotation", action="store_true")
parser.add_argument("-Wb", "--warn-breaking-except", type=str,
help="raise warning when breaking elements other than the specified ones (comma-separated list of element names, e.g. `-Wb hi,stage,foreign`)")
parser.add_argument("-te", "--token-element", help="name of token element", type=str, default='w')
parser.add_argument("-se", "--sentence-element", help="name of sentence element", type=str, default='s')
parser.add_argument("-kb", "--keep-between-sentences", help="keep broken elements between sentences", action="store_true")
args = parser.parse_args()
breakable = args.warn_breaking_except.split(',') if args.warn_breaking_except else None
# input/output files
txtin = Path(args.infile)
jsonin = txtin.with_suffix('.json')
annjsonin = txtin.with_suffix('.ann.json')
xmlout = txtin.with_suffix('.ann.xml')
# read original plaintext, do NOT normalize line-break characters (default python behaviour)!
with txtin.open(encoding='utf-8', newline='') as f:
text = f.read()
# read original XML mark-up description/annotation
with open(jsonin, encoding='utf-8') as jsonfile:
xmlann = json.load(jsonfile)
# read tagger annotation (if found)
if annjsonin.exists():
with annjsonin.open(encoding='utf-8') as jsonfile:
annotation = json.load(jsonfile)
# extract word-level/token annotation (tagging)
tokens = [i for i in annotation if i['name'] == args.token_element]
# extract sentence segmentation (unless it shuold be ignored)
if not args.tokens_only:
sentences = [i for i in annotation if i['name'] == args.sentence_element]
# merge sentence segmentation (if any) with the original XML mark-up
if len(sentences):
xmlann = merge_annotations(xmlann, sentences, breakable, no_betweenspan_elements=not args.keep_between_sentences)
# merge with the token level annotation
xmlann = merge_annotations(xmlann, tokens, breakable)
else:
# sort by XML element order attribute (`merge_annotations` already does this)
xmlann = sorted(xmlann, key=itemgetter('order'))
with xmlout.open('w', encoding='utf-8') as xmlfile:
# merge original plaintext with the annotation and write out
merge_xml(text, xmlann, xmlfile)