"""Test script to process an iDigBio DWCA into something we can parse."""
import argparse
import io
import zipfile
import defusedxml.ElementTree as ET
from lmpy.point import PointDwcaReader
from lmpy.tools._config_parser import _process_arguments
# DWCA Tag Constants
[docs]CORE_TAG = '{http://rs.tdwg.org/dwc/text/}core'
[docs]FIELD_TAG = '{http://rs.tdwg.org/dwc/text/}field'
[docs]FILES_TAG = '{http://rs.tdwg.org/dwc/text/}files'
[docs]ID_TAG = '{http://rs.tdwg.org/dwc/text/}id'
[docs]LOCATION_TAG = '{http://rs.tdwg.org/dwc/text/}location'
[docs]EXTENSION_TAG = '{http://rs.tdwg.org/dwc/text/}extension'
[docs]ROW_TYPE_ATT = 'rowType'
[docs]OCCURRENCE_ROW_TYPE = 'http://rs.tdwg.org/dwc/terms/Occurrence'
[docs]DELIMITED_TERMS = [
'http://portal.idigbio.org/terms/flags',
'http://portal.idigbio.org/terms/recordIds',
]
[docs]DELIMITED_BY_ATT = 'delimitedBy'
# .....................................................................................
# .....................................................................................
[docs]def process_idb_dwca(in_zipfile, out_zipfile):
"""Process an idigbio zipfile.
Args:
in_zipfile (str): File path to an input DWCA zip file.
out_zipfile (str): File path to write the output DWCA zip file.
"""
# Open zip files
dwca_in = zipfile.ZipFile(in_zipfile, mode='r')
dwca_out = zipfile.ZipFile(out_zipfile, mode='w')
# Read and prcoess meta.xml
meta_xml_contents = io.TextIOWrapper(dwca_in.open('meta.xml')).read()
new_meta_xml_contents = process_meta_xml(meta_xml_contents)
dwca_out.writestr('meta.xml', new_meta_xml_contents)
# Read meta.xml
# Correct element tag
# Add delimiter to fields?
# Open occurrence.csv
in_occ = io.TextIOWrapper(dwca_in.open('occurrence.csv'))
i = 0
trip_quote = '"' + '""' # So python doesn't interpret as block comment
quad_quote = '""' + '""'
trip_quote_replace = '"' + "'"
with open('temp_occ.csv', mode='wt') as temp_occ:
for line in in_occ:
i += 1
# Remove double-double-quotes
mod_line = (
line.replace(quad_quote, '""')
.replace(trip_quote, trip_quote_replace)
.replace('""', "'")
)
quote_chunks = mod_line.split('"')
write_line = ''
inside = False
for chunk in quote_chunks:
if chunk.startswith('[') and chunk.endswith(']'):
write_line += (
chunk[1:-1].replace(',', ';').replace("'", '').replace(' ', '')
)
elif chunk.startswith('{') and chunk.endswith('}'): # JSON
write_line += '"' + chunk + '"'
else:
# Quoted section
if inside:
write_line += '"' + chunk + '"'
else:
write_line += chunk
inside = not inside
# Process lists
# while mod_line.find('"[') >= 0:
# pre_idx = mod_line.find('"[')
# post_idx = mod_line[pre_idx:].find(']"') + pre_idx
# pre_chunk = mod_line[:pre_idx]
# list_chunk = mod_line[pre_idx+2:post_idx].replace(',', ';')
# .replace("'", '').replace(' ', '')
# post_chunk = mod_line[post_idx+2:]
# mod_line = pre_chunk + list_chunk + post_chunk
# #mod_line = parts_0[0] + parts_1[0].replace(',', ';')
# .replace("'", '').replace(' ', '') + ']"'.join(parts_1[1:])
# Process json sections (geopoints)
if not write_line.endswith('\n'):
write_line += '\n'
temp_occ.write(write_line)
dwca_out.write('temp_occ.csv', 'occurrence.csv')
in_occ.close()
# Close zip files
dwca_in.close()
dwca_out.close()
# .....................................................................................
[docs]def test_dwca(dwca_filename):
"""Test that a DWCA file can be processed.
Args:
dwca_filename (str): The file path of a DarwinCore Archive to test.
Raises:
ValueError: Raised if points returned is None.
"""
with PointDwcaReader(
dwca_filename, geopoint_term='geoPoint', x_term='lon', y_term='lat'
) as reader:
for points in reader:
if points is None:
raise ValueError('Points is None')
# .....................................................................................
[docs]def main():
"""Main method for script."""
parser = argparse.ArgumentParser()
parser.add_argument('--config_file', type=str, help='Path to configuration file.')
parser.add_argument('in_dwca', type=str, help='Input DWCA zip file.')
parser.add_argument('out_dwca', type=str, help='Output processed DWCA zip file.')
args = _process_arguments(parser, config_arg='config_file')
process_idb_dwca(args.in_dwca, args.out_dwca)
test_dwca(args.out_dwca)
# .....................................................................................
if __name__ == '__main__': # pragma: no cover
main()