# This file is part of scingestor - Scientific Catalog Dataset Ingestor
#
# Copyright (C) 2021-2021 DESY, Jan Kotanski <jkotan@mail.desy.de>
#
# nexdatas is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# nexdatas is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with scingestor. If not, see <http://www.gnu.org/licenses/>.
#
#
#
import os
import glob
import json
import subprocess
import requests
import time
import enum
import socket
import pathlib
import shutil
from .logger import get_logger
[docs]class UpdateStrategy(enum.Enum):
""" Update strategy
"""
#: (:class:`scingestor.datasetIngestor.UpdateStrategy`)
#: leave datasets unchanged
NO = 0
#: (:class:`scingestor.datasetIngestor.UpdateStrategy`) patch datasets
PATCH = 1
#: (:class:`scingestor.datasetIngestor.UpdateStrategy`) recreate datasets
CREATE = 2
#: (:class:`scingestor.datasetIngestor.UpdateStrategy`) patch datasets only
#: if scientificMetadata changed otherwise recreate datasets
MIXED = 3
[docs]class DatasetIngestor:
""" Dataset Ingestor
"""
def __init__(self, configuration,
path, dsfile, idsfile, meta, beamtimefile):
""" constructor
:param configuration: dictionary with the ingestor configuration
:type configuration: :obj:`dict` <:obj:`str`, `any`>
:param path: scan dir path
:type path: :obj:`str`
:param dsfile: file with a dataset list
:type dsfile: :obj:`str`
:param dsfile: file with a ingester dataset list
:type dsfile: :obj:`str`
:param meta: beamtime configuration
:type meta: :obj:`dict` <:obj:`str`, `any`>
:param beamtimefile: beamtime filename
:type beamtimefile: :obj:`str`
:param pidprefix: pidprefix
:type pidprefix: :obj:`str`
:param ingestorcred: ingestor credential
:type ingestorcred: :obj:`str`
:param scicat_url: scicat_url
:type scicat_url: :obj:`str`
"""
#: (:obj:`dict` <:obj:`str`, `any`>) ingestor configuration
self.__config = configuration or {}
#: (:obj:`str`) home directory
self.__homepath = str(pathlib.Path.home())
#: (:obj:`str`) master file extension
self.__ext = 'nxs'
#: (:obj:`str`) plot file extension
self.__plotext = 'png'
#: (:obj:`str`) file with a dataset list
self.__dsfile = dsfile
#: (:obj:`str`) file with a ingested dataset list
self.__idsfile = idsfile
#: (:obj:`str`) file with a ingested dataset tmp list
self.__idsfiletmp = "%s%s" % (idsfile, ".tmp")
#: (:obj:`str`) scan path dir
self.__path = path
#: (:obj:`str`) metadata path dir
self.__metapath = path
#: (:obj:`str`) beamtime id
self.__bid = meta["beamtimeId"]
#: (:obj:`str`) desy proposal id
self.__dpid = meta["proposalId"]
#: (:obj:`str`) beamline name
self.__bl = meta["beamline"]
#: (:obj:`str`) beamtime id
self.__bfile = beamtimefile
#: (:obj:`dict` <:obj:`str`, `any`>) beamtime metadata
self.__meta = meta
#: (:obj:`str`) indested scicat dataset file pattern
self.__hostname = socket.gethostname()
bpath, _ = os.path.split(beamtimefile)
#: (:obj:`str`) relative scan path to beamtime path
self.__relpath = os.path.relpath(path, bpath)
#: (:obj:`str`) doi prefix
self.__pidprefix = ""
# self.__pidprefix = "10.3204/"
#: (:obj:`str`) username
self.__username = 'ingestor'
#: (:obj:`str`) update strategy
self.__strategy = UpdateStrategy.PATCH
#: (:obj:`str`) beamtime id
self.__incd = None
#: (:obj:`bool`) relative path in datablock flag
self.__relpath_in_datablock = False
#: (:obj:`str`) scicat url
self.__scicat_url = "http://localhost:3000/api/v3"
#: (:obj:`str`) scicat users login
self.__scicat_users_login = "Users/login"
#: (:obj:`str`) scicat datasets class
self.__scicat_datasets = "Datasets"
#: (:obj:`str`) scicat proposal class
self.__scicat_proposals = "Proposals"
#: (:obj:`str`) scicat datablock class
self.__scicat_datablocks = "OrigDatablocks"
#: (:obj:`str`) scicat attachment class
self.__scicat_attachments = "Datasets/{pid}/Attachments"
#: (:obj:`str`) chmod string for json metadata
self.__chmod = None
#: (:obj:`str`) hidden attributes
self.__hiddenattributes = None
#: (:obj:`str`) attachment signals
self.__attachmentsignals = None
#: (:obj:`str`) attachment axes
self.__attachmentaxes = None
#: (:obj:`str`) attachment frame
self.__attachmentframe = None
#: (:obj:`bool`) ingest attachment flag
self.__ingest_attachment = True
#: (:obj:`bool`) retry failed dataset ingestion on next event
self.__retry_failed_dataset_ingestion = True
#: (:obj:`bool`) retry failed attachment ingestion on next event
self.__retry_failed_attachment_ingestion = False
#: (:obj:`str`) metadata copy map file
self.__copymapfile = None
#: (:obj:`str`) metadata group map file
self.__groupmapfile = None
#: (:obj:`bool`) oned metadata flag
self.__oned = False
#: (:obj:`bool`) raw groups flag
self.__raw_groups = False
#: (:obj:`int`) max oned size of metadata record
self.__max_oned_size = None
#: (:obj:`bool`) override attachment signals flag
self.__override = False
#: (:obj:`bool`) log generator command flag
self.__logcommands = False
#: (:obj:`bool`) empty units flag
self.__emptyunits = True
#: (:obj:`bool`) force measurement keyword flag
self.__forcemeasurementkeyword = True
#: (:obj:`bool`) force generate measurement flag
self.__forcegeneratemeasurement = False
#: (:obj:`bool`) skip multiple datablock ingestion
self.__skip_multi_datablock = False
#: (:obj:`bool`) single datablock ingestion
self.__single_datablock = False
#: (:obj:`bool`) skip multiple attachment ingestion
self.__skip_multi_attachment = False
#: (:obj:`bool`) skip scan dataset ingestion
self.__skip_scan_dataset_ingestion = False
#: (:obj:`int`) maximal counter value for post tries
self.__maxcounter = 100
#: (:obj:`str`) raw dataset scan postfix
self.__scanpostfix = ".scan.json"
#: (:obj:`str`) origin datablock scan postfix
self.__datablockpostfix = ".origdatablock.json"
#: (:obj:`str`) origin datablock scan postfix
self.__attachmentpostfix = ".attachment.json"
#: (:obj:`str`) nexus dataset shell command
self.__datasetcommandfile = "nxsfileinfo metadata -k4 " \
" -o {metapath}/{scanname}{scanpostfix} " \
" --id-format '{idpattern}'" \
" -z '{measurement}'" \
" -e '{entryname}'" \
" -b {beamtimefile} -p {beamtimeid}/{scanname} " \
" -w {ownergroup}" \
" -c {accessgroups}" \
" {masterfile}"
#: (:obj:`str`) datablock shell command
self.__datasetcommand = "nxsfileinfo metadata -k4 " \
" -o {metapath}/{scanname}{scanpostfix} " \
" --id-format '{idpattern}'" \
" -c {accessgroups}" \
" -w {ownergroup}" \
" -z '{measurement}'" \
" -e '{entryname}'" \
" -b {beamtimefile} -p {beamtimeid}/{scanname}"
#: (:obj:`str`) datablock shell command
self.__datablockcommand = "nxsfileinfo origdatablock " \
" -s *.pyc,*{datablockpostfix},*{scanpostfix}," \
"*{attachmentpostfix},*~ " \
" -r '{dbrelpath}' " \
" -p {pidprefix}{beamtimeid}/{scanname} " \
" -w {ownergroup}" \
" -c {accessgroups}" \
" -o {metapath}/{scanname}{datablockpostfix} "
#: (:obj:`str`) datablock shell command
self.__datablockmemcommand = "nxsfileinfo origdatablock " \
" -s *.pyc,*{datablockpostfix},*{scanpostfix}," \
"*{attachmentpostfix},*~ " \
" -w {ownergroup}" \
" -c {accessgroups} " \
" -r '{dbrelpath}' " \
" -p {pidprefix}{beamtimeid}/{scanname} "
#: (:obj:`str`) datablock path postfix
self.__datablockscanpath = " {scanpath}/{scanname} "
#: (:obj:`str`) attachment shell command
self.__attachmentcommand = "nxsfileinfo attachment " \
" -w {ownergroup} -c {accessgroups} " \
" -n '{entryname}'" \
" -o {metapath}/{scanname}{attachmentpostfix} " \
" {plotfile}"
#: (:obj:`str`) last measurement
self.__measurement = ""
#: (:obj:`set`<:obj:`str`>) current measurements
self.__measurements = set()
#: (:obj:`bool`) measurement status
self.__measurement_status = False
#: (:obj:`bool`) call callback after each step
self.__callcallback = False
#: (:obj:`str`) metadata generated shell callback
self.__metadatageneratedcallback = "nxsfileinfo groupmetadata " \
" {lastmeasurement} -m {metapath}/{scanname}{scanpostfix}" \
" -d {metapath}/{scanname}{datablockpostfix}" \
" -a {metapath}/{scanname}{attachmentpostfix}" \
" -o {metapath}/{lastmeasurement}{scanpostfix}" \
" -l {metapath}/{lastmeasurement}{datablockpostfix}" \
" -t {metapath}/{lastmeasurement}{attachmentpostfix}" \
" -p {beamtimeid}/{lastmeasurement} -f -k4 "
#: (:obj:`str`) oned generator switch
self.__oned_switch = " --oned "
#: (:obj:`str`) raw group generator switch
self.__raw_groups_switch = " --raw "
#: (:obj:`str`) max oned size generator switch
self.__max_oned_switch = " --max-oned-size {maxonedsize} "
#: (:obj:`str`) copy map file generator switch
self.__copymapfile_switch = " --copy-map-file {copymapfile} "
#: (:obj:`str`) group map file generator switch
self.__groupmapfile_switch = " --group-map-file {groupmapfile} "
#: (:obj:`str`) empty units generator switch
self.__emptyunits_switch = " --add-empty-units "
#: (:obj:`str`) chmod generator switch
self.__chmod_switch = " -x {chmod} "
#: (:obj:`str`) hidden attributes generator switch
self.__hiddenattributes_switch = " -n {hiddenattributes} "
#: (:obj:`str`) relpath generator switch
self.__relpath_switch = " -r {relpath} "
#: (:obj:`str`) attachment signals generator switch
self.__attachmentsignals_switch = " -s {signals} "
#: (:obj:`str`) attachment axes generator switch
self.__attachmentaxes_switch = " -e {axes} "
#: (:obj:`str`) attachment frame generator switch
self.__attachmentframe_switch = " -m {frame} "
#: (:obj:`str`) attachment override signals switch
self.__attachmentoverride_switch = " --override "
#: (:obj:`dict` <:obj:`str`, :obj:`str`>) request headers
self.__headers = {'Content-Type': 'application/json',
'Accept': 'application/json'}
#: (:obj:`list`<:obj:`str`>) metadata keywords without checks
self.__withoutsm = [
"techniques",
"classification",
"createdBy",
"updatedBy",
"datasetlifecycle",
"numberOfFiles",
"size",
"createdAt",
"updatedAt",
"history",
"creationTime",
"version",
"scientificMetadata",
"endTime"
]
#: (:obj:`list`<:obj:`str`>) ingested scan names
self.__sc_ingested = []
#: (:obj:`list`<:obj:`str`>) waiting scan names
self.__sc_waiting = []
#: (:obj:`dict`<:obj:`str`, :obj:`list`<:obj:`str`>>)
#: ingested scan names
self.__sc_ingested_map = {}
#: (:obj:`dict`<:obj:`str`, :obj:`list`<:obj:`str`>>)
#: semi-ingested scan names
self.__sc_seingested_map = {}
#: (:obj:`list` <:obj:`str`>) master file extension list
self.__master_file_extension_list = ["nxs", "h5", "ndf", "nx", "fio"]
#: (:obj:`list` <:obj:`str`>) plot file extension list
self.__plot_file_extension_list = \
["png", "nxs", "h5", "ndf", "nx", "fio"]
#: (:obj:`str`) proposalId pattern
self.__idpattern = "{proposalId}.{beamtimeId}"
# self.__idpattern = "{beamtimeId}"
if "scicat_proposal_id_pattern" in self.__config.keys():
self.__idpattern = \
self.__config["scicat_proposal_id_pattern"].replace(
"{proposalid}", "{proposalId}").replace(
"{beamtimeid}", "{beamtimeId}")
if "master_file_extension_list" in self.__config.keys() \
and isinstance(self.__config["master_file_extension_list"], list):
self.__master_file_extension_list = []
for ext in self.__config["master_file_extension_list"]:
if ext:
self.__master_file_extension_list.append(ext)
if self.__master_file_extension_list:
self.__ext = self.__master_file_extension_list[0]
if "plot_file_extension_list" in self.__config.keys() \
and isinstance(self.__config["plot_file_extension_list"], list):
self.__plot_file_extension_list = []
for ext in self.__config["plot_file_extension_list"]:
if ext:
self.__plot_file_extension_list.append(ext)
if self.__plot_file_extension_list:
self.__plotext = self.__plot_file_extension_list[0]
#: (:obj:`str`) access groups
self.__accessgroups = \
"{beamtimeid}-dmgt,{beamtimeid}-clbt,{beamtimeid}-part," \
"{beamline}dmgt,{beamline}staff".format(
beamtimeid=self.__bid, beamline=self.__bl)
if "accessGroups" in self.__meta:
self.__accessgroups = ",".join(self.__meta["accessGroups"])
#: (:obj:`str`) owner group
self.__ownergroup = \
"{beamtimeid}-dmgt".format(
beamtimeid=self.__bid)
if "ownerGroup" in self.__meta:
self.__ownergroup = self.__meta["ownerGroup"]
#: (:obj:`bool`) metadata in log dir flag
self.__meta_in_var_dir = True
if "metadata_in_var_dir" in self.__config.keys():
self.__meta_in_var_dir = self.__config["metadata_in_var_dir"]
#: (:obj:`str`) ingestor log directory
self.__var_dir = ""
if "ingestor_var_dir" in self.__config.keys():
self.__var_dir = str(
self.__config["ingestor_var_dir"]).format(
beamtimeid=self.__bid,
homepath=self.__homepath)
if self.__var_dir == "/":
self.__var_dir = ""
if self.__meta_in_var_dir and self.__var_dir:
self.__metapath = "%s%s" % (self.__var_dir, self.__metapath)
if not os.path.isdir(self.__metapath):
os.makedirs(self.__metapath, exist_ok=True)
if "dataset_pid_prefix" in self.__config.keys():
self.__pidprefix = self.__config["dataset_pid_prefix"]
if "ingestor_credential_file" in self.__config.keys():
with open(self.__config["ingestor_credential_file"].format(
homepath=self.__homepath)) as fl:
self.__incd = fl.read().strip()
if "ingestor_username" in self.__config.keys():
self.__username = self.__config["ingestor_username"]
if "dataset_update_strategy" in self.__config.keys():
try:
self.__strategy = UpdateStrategy[
str(self.__config["dataset_update_strategy"]).upper()]
except Exception as e:
get_logger().warning(
'Wrong UpdateStrategy value: %s' % str(e))
if "scicat_url" in self.__config.keys():
self.__scicat_url = self.__config["scicat_url"]
if "scicat_datasets_path" in self.__config.keys():
self.__scicat_datasets = self.__config["scicat_datasets_path"]
if "scicat_proposals_path" in self.__config.keys():
self.__scicat_proposals = self.__config["scicat_proposals_path"]
if "scicat_datablocks_path" in self.__config.keys():
self.__scicat_datablocks = self.__config["scicat_datablocks_path"]
if "scicat_attachments_path" in self.__config.keys():
self.__scicat_attachments = \
self.__config["scicat_attachments_path"]
if "scicat_users_login_path" in self.__config.keys():
self.__scicat_users_login = \
self.__config["scicat_users_login_path"]
if "relative_path_in_datablock" in self.__config.keys():
self.__relpath_in_datablock = \
self.__config["relative_path_in_datablock"]
if "chmod_json_files" in self.__config.keys():
self.__chmod = self.__config["chmod_json_files"]
if "hidden_attributes" in self.__config.keys():
self.__hiddenattributes = self.__config["hidden_attributes"]
if "attachment_signal_names" in self.__config.keys():
self.__attachmentsignals = self.__config["attachment_signal_names"]
if "attachment_axes_names" in self.__config.keys():
self.__attachmentaxes = self.__config["attachment_axes_names"]
if "attachment_image_frame_number" in self.__config.keys():
self.__attachmentframe = \
self.__config["attachment_image_frame_number"]
if "metadata_copy_map_file" in self.__config.keys():
self.__copymapfile = \
self.__config["metadata_copy_map_file"].format(
homepath=self.__homepath)
if "metadata_group_map_file" in self.__config.keys():
self.__groupmapfile = \
self.__config["metadata_group_map_file"].format(
homepath=self.__homepath)
if "oned_in_metadata" in self.__config.keys():
self.__oned = self.__config["oned_in_metadata"]
if "raw_metadata_callback" in self.__config.keys():
self.__raw_groups = self.__config["raw_metadata_callback"]
if "max_oned_size" in self.__config.keys():
self.__max_oned_size = self.__config["max_oned_size"]
if "override_attachment_signals" in self.__config.keys():
self.__override = self.__config["override_attachment_signals"]
if "log_generator_commands" in self.__config.keys():
self.__logcommands = self.__config["log_generator_commands"]
if "ingest_dataset_attachment" in self.__config.keys():
self.__ingest_attachment = \
self.__config["ingest_dataset_attachment"]
if "retry_failed_dataset_ingestion" in self.__config.keys():
self.__retry_failed_dataset_ingestion = \
self.__config["retry_failed_dataset_ingestion"]
if "retry_failed_attachment_ingestion" in self.__config.keys():
self.__retry_failed_attachment_ingestion = \
self.__config["retry_failed_attachment_ingestion"]
if "add_empty_units" in self.__config.keys():
self.__emptyunits = self.__config["add_empty_units"]
if "force_measurement_keyword" in self.__config.keys():
self.__forcemeasurementkeyword = \
self.__config["force_measurement_keyword"]
if "force_generate_measurement" in self.__config.keys():
self.__forcegeneratemeasurement = \
self.__config["force_generate_measurement"]
if "skip_multi_datablock_ingestion" in self.__config.keys():
self.__skip_multi_datablock = \
self.__config["skip_multi_datablock_ingestion"]
if "single_datablock_ingestion" in self.__config.keys():
self.__single_datablock = \
self.__config["single_datablock_ingestion"]
if "skip_multi_attachment_ingestion" in self.__config.keys():
self.__skip_multi_attachment = \
self.__config["skip_multi_attachment_ingestion"]
if "skip_scan_dataset_ingestion" in self.__config.keys():
self.__skip_scan_dataset_ingestion = \
self.__config["skip_scan_dataset_ingestion"]
if "scan_metadata_postfix" in self.__config.keys():
self.__scanpostfix = self.__config["scan_metadata_postfix"]
if "datablock_metadata_postfix" in self.__config.keys():
self.__datablockpostfix = \
self.__config["datablock_metadata_postfix"]
if "attachment_metadata_postfix" in self.__config.keys():
self.__attachmentpostfix = \
self.__config["attachment_metadata_postfix"]
if "file_dataset_metadata_generator" in self.__config.keys():
self.__datasetcommandfile = \
self.__config["file_dataset_metadata_generator"]
if "dataset_metadata_generator" in self.__config.keys():
self.__datasetcommand = \
self.__config["dataset_metadata_generator"]
if "datablock_metadata_generator" in self.__config.keys():
self.__datablockcommand = \
self.__config["datablock_metadata_generator"]
if "datablock_metadata_stream_generator" in self.__config.keys():
self.__datablockmemcommand = \
self.__config["datablock_metadata_stream_generator"]
if "datablock_metadata_generator_scanpath_postfix" \
in self.__config.keys():
self.__datablockscanpath = \
self.__config["datablock_metadata_generator_scanpath_postfix"]
if "attachment_metadata_generator" in self.__config.keys():
self.__attachmentcommand = \
self.__config["attachment_metadata_generator"]
if "call_metadata_generated_callback" in self.__config.keys():
self.__callcallback = bool(
self.__config["call_metadata_generated_callback"])
if "metadata_generated_callback" in self.__config.keys():
self.__metadatageneratedcallback = \
self.__config["metadata_generated_callback"]
if "chmod_generator_switch" in self.__config.keys():
self.__chmod_switch = \
self.__config["chmod_generator_switch"]
if "hidden_attributes_generator_switch" in self.__config.keys():
self.__hiddenattributes_switch = \
self.__config["hidden_attributes_generator_switch"]
if "attachment_signals_generator_switch" in self.__config.keys():
self.__attachmentsignals_switch = \
self.__config["attachment_signals_generator_switch"]
if "attachment_axes_generator_switch" in self.__config.keys():
self.__attachmentaxes_switch = \
self.__config["attachment_axes_generator_switch"]
if "attachment_frame_generator_switch" in self.__config.keys():
self.__attachmentframe_switch = \
self.__config["attachment_frame_generator_switch"]
if "metadata_copy_map_file_generator_switch" in self.__config.keys():
self.__copymapfile_switch = \
self.__config["metadata_copy_map_file_generator_switch"]
if "metadata_group_map_file_generator_switch" in self.__config.keys():
self.__groupmapfile_switch = \
self.__config["metadata_group_map_file_generator_switch"]
if "relative_path_generator_switch" in self.__config.keys():
self.__relpath_switch = \
self.__config["relative_path_generator_switch"]
if "oned_dataset_generator_switch" in self.__config.keys():
self.__oned_switch = \
self.__config["oned_dataset_generator_switch"]
if "raw_metadata_callback_switch" in self.__config.keys():
self.__raw_groups_switch = \
self.__config["raw_metadata_callback_switch"]
if "max_oned_dataset_generator_switch" in self.__config.keys():
self.__max_oned_switch = \
self.__config["max_oned_dataset_generator_switch"]
if "override_attachment_signals_generator_switch" \
in self.__config.keys():
self.__attachmentoverride_switch = \
self.__config["override_attachment_signals_generator_switch"]
if "add_empty_units_generator_switch" in self.__config.keys():
self.__emptyunits_switch = \
self.__config["add_empty_units_generator_switch"]
if not self.__relpath_in_datablock:
if "dataset_metadata_generator" not in self.__config.keys():
self.__datasetcommand = \
self.__datasetcommand + self.__relpath_switch
if "file_dataset_metadata_generator" not in self.__config.keys():
self.__datasetcommandfile = \
self.__datasetcommandfile + self.__relpath_switch
if self.__chmod is not None:
if "dataset_metadata_generator" not in self.__config.keys():
self.__datasetcommand = \
self.__datasetcommand + self.__chmod_switch
if "file_dataset_metadata_generator" not in self.__config.keys():
self.__datasetcommandfile = \
self.__datasetcommandfile + self.__chmod_switch
if "datablock_metadata_generator" not in self.__config.keys():
self.__datablockcommand = \
self.__datablockcommand + self.__chmod_switch
if "datablock_metadata_stream_generator" \
not in self.__config.keys():
self.__datablockmemcommand = \
self.__datablockmemcommand + self.__chmod_switch
if "attachment_metadata_generator" not in self.__config.keys():
self.__attachmentcommand = \
self.__attachmentcommand + self.__chmod_switch
if "metadata_generated_callback" not in self.__config.keys():
self.__metadatageneratedcallback = \
self.__metadatageneratedcallback + self.__chmod_switch
if self.__groupmapfile is not None:
if "metadata_generated_callback" not in self.__config.keys():
self.__metadatageneratedcallback = \
self.__metadatageneratedcallback + \
self.__groupmapfile_switch
if self.__raw_groups:
if "metadata_generated_callback" not in self.__config.keys():
self.__metadatageneratedcallback = \
self.__metadatageneratedcallback + \
self.__raw_groups_switch
if self.__hiddenattributes is not None:
if "dataset_metadata_generator" not in self.__config.keys():
self.__datasetcommand = \
self.__datasetcommand + self.__hiddenattributes_switch
if "file_dataset_metadata_generator" not in self.__config.keys():
self.__datasetcommandfile = \
self.__datasetcommandfile + self.__hiddenattributes_switch
if self.__copymapfile is not None:
if "dataset_metadata_generator" not in self.__config.keys():
self.__datasetcommand = \
self.__datasetcommand + self.__copymapfile_switch
if "file_dataset_metadata_generator" not in self.__config.keys():
self.__datasetcommandfile = \
self.__datasetcommandfile + self.__copymapfile_switch
if self.__oned:
if "dataset_metadata_generator" not in self.__config.keys():
self.__datasetcommand = \
self.__datasetcommand + self.__oned_switch
if "file_dataset_metadata_generator" not in self.__config.keys():
self.__datasetcommandfile = \
self.__datasetcommandfile + self.__oned_switch
if self.__oned and self.__max_oned_size:
if "dataset_metadata_generator" not in self.__config.keys():
self.__datasetcommand = \
self.__datasetcommand + self.__max_oned_switch
if "file_dataset_metadata_generator" not in self.__config.keys():
self.__datasetcommandfile = \
self.__datasetcommandfile + self.__max_oned_switch
if self.__emptyunits:
if "dataset_metadata_generator" not in self.__config.keys():
self.__datasetcommand = \
self.__datasetcommand + self.__emptyunits_switch
if "file_dataset_metadata_generator" not in self.__config.keys():
self.__datasetcommandfile = \
self.__datasetcommandfile + self.__emptyunits_switch
if self.__attachmentsignals is not None:
if "attachment_metadata_generator" not in self.__config.keys():
self.__attachmentcommand = \
self.__attachmentcommand + self.__attachmentsignals_switch
if self.__attachmentaxes is not None:
if "attachment_metadata_generator" not in self.__config.keys():
self.__attachmentcommand = \
self.__attachmentcommand + self.__attachmentaxes_switch
if self.__attachmentframe is not None:
if "attachment_metadata_generator" not in self.__config.keys():
self.__attachmentcommand = \
self.__attachmentcommand + self.__attachmentframe_switch
if self.__override:
if "attachment_metadata_generator" not in self.__config.keys():
self.__attachmentcommand = \
self.__attachmentcommand + self.__attachmentoverride_switch
if "max_request_tries_number" in self.__config.keys():
try:
self.__maxcounter = int(
self.__config["max_request_tries_number"])
except Exception as e:
get_logger().warning('%s' % (str(e)))
if "request_headers" in self.__config.keys():
try:
self.__headers = dict(
self.__config["request_headers"])
except Exception as e:
get_logger().warning('%s' % (str(e)))
if "metadata_fields_without_checks" in self.__config.keys():
try:
self.__withoutsm = list(
self.__config["metadata_fields_without_checks"])
except Exception as e:
get_logger().warning('%s' % (str(e)))
#: (:obj:`dict` <:obj:`str`, :obj:`str`>) command format parameters
self.__dctfmt = {
"scanname": None,
"chmod": self.__chmod,
"hiddenattributes": self.__hiddenattributes,
"copymapfile": self.__copymapfile,
"plotfile": "",
"masterfile": "",
"scanpath": self.__path,
"metapath": self.__metapath,
"relpath": self.__relpath,
"dbrelpath": "",
"beamtimeid": self.__bid,
"beamline": self.__bl,
"pidprefix": self.__pidprefix,
"beamtimefile": self.__bfile,
"scanpostfix": self.__scanpostfix,
"datablockpostfix": self.__datablockpostfix,
"attachmentpostfix": self.__attachmentpostfix,
"ownergroup": self.__ownergroup,
"accessgroups": self.__accessgroups,
"hostname": self.__hostname,
"homepath": self.__homepath,
"ext": self.__ext,
"plotext": self.__plotext,
"signals": self.__attachmentsignals,
"axes": self.__attachmentaxes,
"frame": self.__attachmentframe,
"maxonedsize": self.__max_oned_size,
"measurement": self.__measurement,
"lastmeasurement": self.__measurement,
"groupmapfile": self.__groupmapfile,
"masterscanname": "",
"entryname": "",
"idpattern": self.__idpattern,
}
self.__dctfmt["masterfile"] = \
"{scanpath}/{masterscanname}.{ext}".format(**self.__dctfmt)
self.__dctfmt["plotfile"] = \
"{scanpath}/{masterscanname}.{plotext}".format(**self.__dctfmt)
get_logger().debug(
'DatasetIngestor: Parameters: %s' % str(self.__dctfmt))
# self.__tokenurl = "http://www-science3d.desy.de:3000/api/v3/" \
# "Users/login"
if not self.__scicat_url.endswith("/"):
self.__scicat_url = self.__scicat_url + "/"
#: (:obj:`str`) token url
self.__tokenurl = self.__scicat_url + self.__scicat_users_login
# get_logger().info(
# 'DatasetIngestor: LOGIN %s' % self.__tokenurl)
#: (:obj:`str`) dataset url
self.__dataseturl = self.__scicat_url + self.__scicat_datasets
# self.__dataseturl = "http://www-science3d.desy.de:3000/api/v3/" \
# "Datasets"
#: (:obj:`str`) dataset url
self.__proposalurl = self.__scicat_url + self.__scicat_proposals
# self.__proposalurl = "http://www-science3d.desy.de:3000/api/v3/" \
# "Proposals"
#: (:obj:`str`) origdatablock url
self.__datablockurl = self.__scicat_url + self.__scicat_datablocks
# self.__dataseturl = "http://www-science3d.desy.de:3000/api/v3/" \
# "OrigDatablocks"
#: (:obj:`str`) origdatablock url
#: (:obj:`str`) attachment url
self.__attachmenturl = self.__scicat_url + self.__scicat_attachments
# self.__dataseturl = "http://www-science3d.desy.de:3000/api/v3/" \
# "Datasets/{pid}/Attachments"
#: (:obj:`str`) origdatablock url
def _generate_rawdataset_metadata(self, scan):
""" generate raw dataset metadata
:param scan: scan name
:type scan: :obj:`str`
:returns: a file name of generate file
:rtype: :obj:`str`
"""
self.__ext = ""
self.__dctfmt["masterfile"] = \
"{scanpath}/{masterscanname}.{ext}".format(**self.__dctfmt)
for ext in self.__master_file_extension_list:
self.__dctfmt["ext"] = ext
if os.path.isfile(
"{scanpath}/{masterscanname}.{ext}".format(
**self.__dctfmt)):
self.__ext = ext
self.__dctfmt["masterfile"] = \
"{scanpath}/{masterscanname}.{ext}".format(
**self.__dctfmt)
break
else:
for ext in self.__master_file_extension_list:
self.__dctfmt["ext"] = ext
if os.path.isfile(
"{scanpath}/{scanname}/{scanname}.{ext}".
format(**self.__dctfmt)):
self.__ext = ext
self.__dctfmt["masterfile"] = \
"{scanpath}/{scanname}/{scanname}.{ext}".format(
**self.__dctfmt)
break
self.__dctfmt["ext"] = self.__ext
ffname = ""
if self.__ext:
if self.__dctfmt["masterscanname"] != self.__dctfmt["scanname"]:
masterfile = self.__dctfmt["masterfile"]
mdir, mfile = os.path.split(masterfile)
if self.__meta_in_var_dir and self.__var_dir:
mdir = "%s%s" % (self.__var_dir, mdir)
if not os.path.isdir(mdir):
os.makedirs(mdir, exist_ok=True)
fcnt = 1
ffname = os.path.join(
mdir, "_tmp_scingestor_%s_%s" % (fcnt, mfile))
while os.path.isfile(ffname):
fcnt += 1
ffname = os.path.join(
mdir, "_tmp_scingestor_%s_%s" % (fcnt, mfile))
self.__dctfmt["masterfile"] = ffname
shutil.copy(masterfile, self.__dctfmt["masterfile"])
get_logger().info(
'DatasetIngestor: Generating %s metadata: %s %s' % (
self.__ext, scan,
"{metapath}/{scanname}{scanpostfix}".format(
**self.__dctfmt)))
command = self.__datasetcommandfile.format(**self.__dctfmt)
if self.__logcommands:
get_logger().info(
'DatasetIngestor: Generating dataset command: %s ' % (
command))
else:
get_logger().debug(
'DatasetIngestor: Generating dataset command: %s ' % (
command))
subprocess.run(command, shell=True, check=True)
if self.__dctfmt["masterscanname"] != self.__dctfmt["scanname"]:
if os.path.isfile(self.__dctfmt["masterfile"]):
os.remove(self.__dctfmt["masterfile"])
self.__dctfmt["masterfile"] = masterfile
else:
get_logger().info(
'DatasetIngestor: Generating metadata: %s %s' % (
scan,
"{metapath}/{scanname}{scanpostfix}".format(
**self.__dctfmt)))
command = self.__datasetcommand.format(**self.__dctfmt)
if self.__logcommands:
get_logger().info(
'DatasetIngestor: Generating dataset command: %s'
% (command))
else:
get_logger().debug(
'DatasetIngestor: Generating dataset command: %s'
% (command))
subprocess.run(command, shell=True, check=True)
if ffname and os.path.isfile(ffname):
try:
os.remove(ffname)
except Exception as e:
get_logger().warning(
"File %s cannot be removed: %s" % (ffname, str(e)))
rdss = glob.glob(
"{metapath}/{scanname}{scanpostfix}".format(**self.__dctfmt))
if rdss and rdss[0]:
return rdss[0]
return ""
def _generate_origdatablock_metadata(self, scan):
""" generate origdatablock metadata
:param scan: scan name
:type scan: :obj:`str`
:returns: a file name of generate file
:rtype: :obj:`str`
"""
get_logger().info(
'DatasetIngestor: Generating origdatablock metadata: %s %s' % (
scan,
"{metapath}/{scanname}{datablockpostfix}".format(
**self.__dctfmt)))
cmd = self.__datablockcommand.format(**self.__dctfmt)
sscan = (scan or "").split(" ")
for sc in sscan:
cmd += self.__datablockscanpath.format(
scanpath=self.__dctfmt["scanpath"], scanname=sc)
if self.__logcommands:
get_logger().info(
'DatasetIngestor: Generating origdatablock command: %s' % cmd)
else:
get_logger().debug(
'DatasetIngestor: Generating origdatablock command: %s' % cmd)
subprocess.run(cmd, shell=True, check=True)
odbs = glob.glob(
"{metapath}/{scanname}{datablockpostfix}".format(
**self.__dctfmt))
if odbs and odbs[0]:
return odbs[0]
return ""
def _generate_attachment_metadata(self, scan):
""" generate origdatablock metadata
:param scan: scan name
:type scan: :obj:`str`
:returns: a file name of generate file
:rtype: :obj:`str`
"""
self.__plotext = ""
self.__dctfmt["plotfile"] = \
"{scanpath}/{masterscanname}.{plotext}".format(**self.__dctfmt)
for ext in self.__plot_file_extension_list:
self.__dctfmt["plotext"] = ext
if os.path.isfile(
"{scanpath}/{masterscanname}.{plotext}".format(
**self.__dctfmt)):
self.__plotext = ext
self.__dctfmt["plotfile"] = \
"{scanpath}/{masterscanname}.{plotext}".format(
**self.__dctfmt)
break
else:
for ext in self.__plot_file_extension_list:
self.__dctfmt["plotext"] = ext
if os.path.isfile(
"{scanpath}/{scanname}/{scanname}.{plotext}".
format(**self.__dctfmt)):
self.__plotext = ext
self.__dctfmt["plotfile"] = \
"{scanpath}/{scanname}/{scanname}.{plotext}".format(
**self.__dctfmt)
break
self.__dctfmt["plotext"] = self.__plotext
ffname = ""
if self.__dctfmt["plotext"]:
if self.__dctfmt["masterscanname"] != self.__dctfmt["scanname"]:
plotfile = self.__dctfmt["plotfile"]
mdir, mfile = os.path.split(plotfile)
if self.__meta_in_var_dir and self.__var_dir:
mdir = "%s%s" % (self.__var_dir, mdir)
if not os.path.isdir(mdir):
os.makedirs(mdir, exist_ok=True)
fcnt = 1
ffname = os.path.join(
mdir, "_tmp_scingestor_%s_%s" % (fcnt, mfile))
while os.path.isfile(ffname):
fcnt += 1
ffname = os.path.join(
mdir, "_tmp_scingestor_%s_%s" % (fcnt, mfile))
self.__dctfmt["plotfile"] = ffname
shutil.copy(plotfile, self.__dctfmt["plotfile"])
get_logger().info(
'DatasetIngestor: Generating attachment metadata: %s %s' % (
scan,
"{metapath}/{scanname}{attachmentpostfix}".format(
**self.__dctfmt)))
cmd = self.__attachmentcommand.format(**self.__dctfmt)
if self.__logcommands:
get_logger().info(
'DatasetIngestor: Generating attachment command: %s' % cmd)
else:
get_logger().debug(
'DatasetIngestor: Generating attachment command: %s' % cmd)
subprocess.run(cmd, shell=True, check=True)
if self.__dctfmt["masterscanname"] != self.__dctfmt["scanname"]:
if os.path.isfile(self.__dctfmt["plotfile"]):
os.remove(self.__dctfmt["plotfile"])
self.__dctfmt["plotfile"] = plotfile
if ffname and os.path.isfile(ffname):
try:
os.remove(ffname)
except Exception as e:
get_logger().warning(
"File %s cannot be removed: %s" % (ffname, str(e)))
adss = glob.glob(
"{metapath}/{scanname}{attachmentpostfix}".format(
**self.__dctfmt))
if adss and adss[0]:
return adss[0]
return ""
def _regenerate_origdatablock_metadata(self, scan,
force=False, mfilename=""):
"""regenerate origdatablock metadata
:param scan: scan name
:type scan: :obj:`str`
:param force: force flag
:type force: :obj:`bool`
:param mfilename: metadata file name
:type mfilename: :obj:`str`
:returns: a file name of generate file
:rtype: :obj:`str`
"""
mfilename = "{metapath}/{scanname}{datablockpostfix}".format(
**self.__dctfmt)
get_logger().info(
'DatasetIngestor: Checking origdatablock metadata: %s %s' % (
scan, mfilename))
# cmd = self.__datablockcommand.format(**self.__dctfmt)
dmeta = None
try:
with open(mfilename, "r") as mf:
meta = mf.read()
dmeta = json.loads(meta)
except Exception as e:
if not force:
get_logger().warning('%s: %s' % (scan, str(e)))
cmd = self.__datablockmemcommand.format(**self.__dctfmt)
sscan = (scan or "").split(" ")
if self.__datablockscanpath:
dctfmt = dict(self.__dctfmt)
for sc in sscan:
dctfmt["scanname"] = sc
cmd += self.__datablockscanpath.format(**dctfmt)
get_logger().debug(
'DatasetIngestor: Checking origdatablock command: %s ' % cmd)
if self.__logcommands:
get_logger().info(
'DatasetIngestor: Generating origdatablock command: %s'
% cmd)
else:
get_logger().debug(
'DatasetIngestor: Generating origdatablock command: %s'
% cmd)
result = subprocess.run(
cmd, shell=True,
text=True, capture_output=True, check=True)
nwmeta = str(result.stdout)
if dmeta is None:
with open(mfilename, "w") as mf:
mf.write(nwmeta)
else:
try:
dnwmeta = json.loads(nwmeta)
except Exception as e:
get_logger().warning('%s: %s' % (scan, str(e)))
dnwmeta = None
if dnwmeta is not None:
if not self._metadataEqual(dmeta, dnwmeta) or force:
get_logger().info(
'DatasetIngestor: '
'Generating origdatablock metadata: %s %s' % (
scan,
"{metapath}/{scanname}{datablockpostfix}".format(
**self.__dctfmt)))
with open(mfilename, "w") as mf:
mf.write(nwmeta)
odbs = glob.glob(mfilename)
if odbs and odbs[0]:
return odbs[0]
return ""
def _metadataEqual(self, dct, dct2, skip=None, parent=None):
""" compare two dictionaries if metdatdata is equal
:param dct: first metadata dictionary
:type dct: :obj:`dct` <:obj:`str`, `any`>
:param dct2: second metadata dictionary
:type dct2: :obj:`dct` <:obj:`str`, `any`>
:param skip: a list of keywords to skip
:type skip: :obj:`list` <:obj:`str`>
:param parent: the parent metadata dictionary to use in recursion
:type parent: :obj:`dct` <:obj:`str`, `any`>
"""
parent = parent or ""
w1 = [("%s.%s" % (parent, k) if parent else k)
for k in dct.keys()
if (not skip or
(("%s.%s" % (parent, k) if parent else k)
not in skip))]
w2 = [("%s.%s" % (parent, k) if parent else k)
for k in dct2.keys()
if (not skip or
(("%s.%s" % (parent, k) if parent else k)
not in skip))]
if len(w1) != len(w2):
get_logger().debug(
'DatasetIngestor: %s != %s' % (
list(w1), list(w2)))
return False
status = True
for k, v in dct.items():
if parent:
node = "%s.%s" % (parent, k)
else:
node = k
if not skip or node not in skip:
if k not in dct2.keys():
get_logger().debug(
'DatasetIngestor: %s not in %s'
% (k, dct2.keys()))
status = False
break
if isinstance(v, dict):
if not self._metadataEqual(v, dct2[k], skip, node):
status = False
break
else:
if v != dct2[k]:
get_logger().debug(
'DatasetIngestor %s: %s != %s'
% (k, v, dct2[k]))
status = False
break
return status
[docs] def get_token(self):
""" provides ingestor token
:returns: ingestor token
:rtype: :obj:`str`
"""
try:
response = requests.post(
self.__tokenurl, headers=self.__headers,
json={"username": self.__username, "password": self.__incd})
if response.ok:
return json.loads(response.content)["id"]
else:
raise Exception("%s" % response.text)
except Exception as e:
get_logger().error(
'DatasetIngestor: %s' % (str(e)))
return ""
[docs] def append_proposal_groups(self):
""" appends owner and access groups to beamtime
:param meta: beamtime configuration
:type meta: :obj:`dict` <:obj:`str`, `any`>
:param path: base file path
:type path: :obj:`str`
:returns: updated beamtime configuration
:rtype: :obj:`dict` <:obj:`str`, `any`>
"""
token = self.get_token()
bid = self.__meta["beamtimeId"]
try:
self.__headers["Authorization"] = "Bearer {}".format(token)
propid = self.__idpattern.format(
beamtimeId=self.__bid.replace("/", "%2F"),
proposalId=self.__dpid.replace("/", "%2F"))
resexists = requests.get(
"{url}/{pid}"
.format(
url=self.__proposalurl,
pid=propid),
headers=self.__headers,
params={"access_token": token}
)
if resexists.ok:
pexists = bool(resexists.content)
else:
raise Exception(
"Proposal %s: %s"
% (propid, resexists.text or '{\"exists\": false}'))
if pexists:
resget = resexists
if resget.ok:
proposal = json.loads(resget.content)
if "ownerGroup" not in self.__meta and \
"ownerGroup" in proposal:
self.__meta["ownerGroup"] = proposal["ownerGroup"]
self.__ownergroup = self.__meta["ownerGroup"]
self.__dctfmt["ownergroup"] = self.__ownergroup
if "accessGroups" not in self.__meta and \
"accessGroups" in proposal:
self.__meta["accessGroups"] = list(
proposal["accessGroups"])
self.__accessgroups = \
",".join(self.__meta["accessGroups"])
self.__dctfmt["accessgroups"] = self.__accessgroups
else:
raise Exception(
"Proposal %s: %s" % (bid, resget.text))
else:
raise Exception("Proposal %s: %s" %
(bid, resexists.text or '{\"exists\": false}'))
except Exception as e:
get_logger().warning('%s' % (str(e)))
return self.__meta
def _post_dataset(self, mdic, token, mdct):
""" post dataset
:param mdic: metadata in dct
:type mdic: :obj:`dct` <:obj:`str`, `any`>
:param token: ingestor token
:type token: :obj:`str`
:param mdct: metadata in dct
:type mdct: :obj:`dct` <:obj:`str`, `any`>
:returns: dataset pid
:rtype: :obj:`str`
"""
# create a new dataset since
# core metadata of dataset were changed
# find a new pid
pexist = True
npid = mdic["pid"]
ipid = mdct["pid"]
while pexist:
npre = ""
if npid.startswith(self.__pidprefix):
npre = self.__pidprefix
npid = npid[len(npre):]
spid = npid.split("/")
if len(spid) > 2:
try:
ver = int(spid[-1])
spid[-1] = str(ver + 1)
except Exception:
spid.append("2")
else:
spid.append("2")
npid = npre + "/".join(spid)
if len(spid) > 0:
ipid = npre + "/".join(spid)
self.__headers["Authorization"] = "Bearer {}".format(token)
resexists = requests.get(
"{url}/{pid}"
.format(
url=self.__dataseturl,
pid=(npre + npid.replace("/", "%2F"))),
headers=self.__headers,
params={"access_token": token})
if resexists.ok:
pexist = bool(resexists.content)
else:
raise Exception("%s" % resexists.text)
mdic["pid"] = ipid
nmeta = json.dumps(mdic)
get_logger().info(
'DatasetIngestor: '
'Post the dataset with a new pid: %s' % (npid))
# post the dataset with the new pid
self.__headers["Authorization"] = "Bearer {}".format(token)
response = requests.post(
self.__dataseturl,
params={"access_token": token},
headers=self.__headers,
data=nmeta)
if response.ok:
return mdic["pid"]
else:
raise Exception("%s" % response.text)
def _patch_dataset(self, nmeta, pid, token, mdct):
""" post dataset
:param nmeta: metadata in json string
:type nmeta: :obj:`str`
:param pid: dataset pid
:type pid: :obj:`str`
:param token: ingestor token
:type token: :obj:`str`
:param mdct: metadata in dct
:type mdct: :obj:`dct` <:obj:`str`, `any`>
:returns: dataset pid
:rtype: :obj:`str`
"""
get_logger().info(
'DatasetIngestor: '
'Patch scientificMetadata of dataset:'
' %s' % (pid))
self.__headers["Authorization"] = "Bearer {}".format(token)
response = requests.patch(
"{url}/{pid}"
.format(
url=self.__dataseturl,
pid=pid.replace("/", "%2F")),
params={"access_token": token},
headers=self.__headers,
data=nmeta)
if response.ok:
return mdct["pid"]
else:
raise Exception("%s" % response.text)
def _ingest_dataset(self, metadata, token, mdct):
""" ingests dataset
:param metadata: metadata in json string
:type metadata: :obj:`str`
:param token: ingestor token
:type token: :obj:`str`
:param mdct: metadata in dct
:type mdct: :obj:`dct` <:obj:`str`, `any`>
:returns: dataset pid
:rtype: :obj:`str`
"""
try:
pid = "%s%s" % (self.__pidprefix, mdct["pid"])
# check if dataset with the pid exists
get_logger().info(
'DatasetIngestor: Check if dataset exists: %s' % (pid))
checking = True
counter = 0
self.__headers["Authorization"] = "Bearer {}".format(token)
while checking:
resexists = requests.get(
"{url}/{pid}".format(
url=self.__dataseturl,
pid=pid.replace("/", "%2F")),
headers=self.__headers,
params={"access_token": token}
)
if hasattr(resexists, "content"):
try:
json.loads(resexists.content)
checking = False
except Exception:
time.sleep(0.1)
if resexists.ok and hasattr(resexists, "content") and \
not bool(resexists.content):
checking = False
else:
time.sleep(0.1)
if counter == self.__maxcounter:
checking = False
counter += 1
if resexists.ok and hasattr(resexists, "content"):
try:
exists = bool(resexists.content)
except Exception:
exists = False
if not exists:
# post the new dataset since it does not exist
get_logger().info(
'DatasetIngestor: Post the dataset: %s' % (pid))
self.__headers["Authorization"] = "Bearer {}".format(token)
response = requests.post(
self.__dataseturl,
headers=self.__headers,
params={"access_token": token},
data=metadata)
if response.ok:
return mdct["pid"]
else:
raise Exception("%s" % response.text)
elif self.__strategy != UpdateStrategy.NO:
# find dataset by pid
get_logger().info(
'DatasetIngestor: Find the dataset by id: %s' % (pid))
resds = requests.get(
"{url}/{pid}".format(
url=self.__dataseturl,
pid=pid.replace("/", "%2F")),
headers=self.__headers,
params={"access_token": token}
)
if resds.ok:
dsmeta = json.loads(resds.content)
mdic = dict(mdct)
mdic["pid"] = pid
if self.__forcemeasurementkeyword and \
self.__dctfmt["measurement"] and \
"keywords" in mdic and \
isinstance(mdic["keywords"], list) and \
self.__dctfmt["measurement"] \
not in mdic["keywords"]:
mdic["keywords"].append(
self.__dctfmt["measurement"])
if not self._metadataEqual(
dsmeta, mdic, skip=self.__withoutsm):
if self.__strategy in [
UpdateStrategy.PATCH, UpdateStrategy.NO]:
nmeta = json.dumps(mdic)
# mm = dict(mdic)
# mm["scientificMetadata"] = {}
# get_logger().info(
# 'DatasetIngestor: PATCH: %s' % str(mm))
return self._patch_dataset(
nmeta, pid, token, mdct)
else:
return self._post_dataset(mdic, token, mdct)
else:
if "scientificMetadata" in dsmeta.keys() and \
"scientificMetadata" in mdic.keys():
smmeta = dsmeta["scientificMetadata"]
smnmeta = mdic["scientificMetadata"]
nmeta = json.dumps(mdic)
if not self._metadataEqual(smmeta, smnmeta):
if self.__strategy == \
UpdateStrategy.CREATE:
return self._post_dataset(
mdic, token, mdct)
else:
return self._patch_dataset(
nmeta, pid, token, mdct)
else:
raise Exception("%s" % resds.text)
else:
return pid
else:
raise Exception("%s" % resexists.text)
except Exception as e:
get_logger().error(
'DatasetIngestor: %s' % (str(e)))
return None
def _ingest_origdatablock(self, metadata, token):
""" ingets origdatablock
:param metadata: metadata in json string
:type metadata: :obj:`str`
:param token: ingestor token
:type token: :obj:`str`
:returns: rewquest startus
:rtype: :obj:`bool`
"""
try:
self.__headers["Authorization"] = "Bearer {}".format(token)
response = requests.post(
self.__datablockurl,
headers=self.__headers,
params={"access_token": token},
data=metadata)
if response.ok:
return True
else:
raise Exception("%s" % response.text)
except Exception as e:
get_logger().error(
'DatasetIngestor: %s' % (str(e)))
return False
def _ingest_attachment(self, metadata, datasetid, token):
""" ingets origdatablock
:param metadata: metadata in json string
:type metadata: :obj:`str`
:param datasetid: dataset id
:type datasetid: :obj:`str`
:param token: ingestor token
:type token: :obj:`str`
:returns: rewquest startus
:rtype: :obj:`bool`
"""
try:
dsid = datasetid.replace("/", "%2F")
url = self.__attachmenturl
# get_logger().debug(
# 'DatasetIngestor: ingest attachment %s' % (
# url.format(pid=dsid, token=token)))
self.__headers["Authorization"] = "Bearer {}".format(token)
response = requests.post(
url.format(pid=dsid),
headers=self.__headers,
params={"access_token": token},
data=metadata)
if response.ok:
return True
else:
raise Exception("%s" % response.text)
except Exception as e:
get_logger().error(
'DatasetIngestor: %s' % (str(e)))
return False
def _get_origdatablocks(self, datasetid, token):
""" get origdatablocks with datasetid
:param datasetid: dataset id
:type datasetid: :obj:`str`
:param token: ingestor token
:type token: :obj:`str`
:returns: list of origdatablocks
:rtype: :obj:`str` <:obj:`str`>
"""
try:
self.__headers["Authorization"] = "Bearer {}".format(token)
response = requests.get(
self.__dataseturl + "/%s/%s" %
(datasetid.replace("/", "%2F"), self.__scicat_datablocks),
params={"access_token": token},
headers=self.__headers)
if response.ok:
js = response.json()
return js
except Exception as e:
get_logger().error(
'DatasetIngestor: %s' % (str(e)))
return None
def _get_delete_origdatablock(self, did, token):
""" ingets origdatablock
:param did: origdatablock id
:type did: :obj:`str`
:param token: ingestor token
:type token: :obj:`str`
"""
try:
self.__headers["Authorization"] = "Bearer {}".format(token)
response = requests.delete(
"{url}/{pid}"
.format(
url=self.__datablockurl,
pid=did.replace("/", "%2F")),
params={"access_token": token},
headers=self.__headers,
)
if response.ok:
return True
else:
raise Exception("%s" % response.text)
except Exception as e:
get_logger().error(
'DatasetIngestor: %s' % (str(e)))
return None
def _get_attachments(self, datasetid, token):
""" get attachments with datasetid
:param datasetid: dataset id
:type datasetid: :obj:`str`
:param token: ingestor token
:type token: :obj:`str`
:returns: list of attachments
:rtype: :obj:`str` <:obj:`str`>
"""
try:
self.__headers["Authorization"] = "Bearer {}".format(token)
response = requests.get(self.__attachmenturl.format(
pid=datasetid.replace("/", "%2F")),
params={"access_token": token},
headers=self.__headers
)
if response.ok:
js = response.json()
return js
except Exception as e:
get_logger().error(
'DatasetIngestor: %s' % (str(e)))
return None
def _get_delete_attachment(self, datasetid, aid, token):
""" ingets attachment
:param datasetid: dataset id
:type datasetid: :obj:`str`
:param aid: attachment id
:type aid: :obj:`str`
:param token: ingestor token
:type token: :obj:`str`
"""
try:
self.__headers["Authorization"] = "Bearer {}".format(token)
response = requests.delete(
self.__attachmenturl.format(
pid=datasetid.replace("/", "%2F"))
+ "/{aid}".format(aid=aid.replace("/", "%2F")),
params={"access_token": token},
headers=self.__headers)
if response.ok:
return True
else:
raise Exception("%s" % response.text)
except Exception as e:
get_logger().error(
'DatasetIngestor: %s' % (str(e)))
return None
def _get_pid(self, metafile):
""" get pid from raw dataset metadata
:param metafile: metadata file name
:type metafile: :obj:`str`
:returns: dataset pid
:rtype: :obj:`str`
"""
pid = None
try:
with open(metafile) as fl:
smt = fl.read()
mt = json.loads(smt)
pid = mt["pid"]
except Exception as e:
get_logger().error(
'DatasetIngestor: %s' % (str(e)))
return pid
def _ingest_rawdataset_metadata(self, metafile, token):
""" ingest raw dataset metadata
:param metafile: metadata file name
:type metafile: :obj:`str`
:param token: ingestor token
:type token: :obj:`str`
:returns: dataset id
:rtype: :obj:`str`
"""
try:
with open(metafile) as fl:
smt = fl.read()
mt = json.loads(smt)
spid = self.__idpattern.format(
beamtimeId=self.__bid, proposalId=self.__dpid)
if mt["type"] == "raw" and mt["proposalId"] != spid:
raise Exception(
"Wrong SC proposalId %s for DESY beamtimeId %s in %s"
% (mt["proposalId"], self.__bid, metafile))
if not mt['pid'] or \
not mt["pid"].startswith("%s/" % (self.__bid)):
raise Exception(
"Wrong pid %s for DESY beamtimeId %s in %s"
% (mt["pid"], self.__bid, metafile))
status = self._ingest_dataset(smt, token, mt)
if status:
return status
except Exception as e:
get_logger().error(
'DatasetIngestor: %s' % (str(e)))
return None
def _delete_origdatablocks(self, pid, token):
""" delete origdatablock with given dataset pid
:param pid: dataset id
:type pid: :obj:`str`
:param token: ingestor token
:type token: :obj:`str`
:returns: dataset id
:rtype: :obj:`str`
"""
try:
datasetid = "%s%s" % (self.__pidprefix, pid)
odbs = self._get_origdatablocks(datasetid, token) or []
for odb in odbs:
if "id" in odb:
self._get_delete_origdatablock(odb["id"], token)
except Exception as e:
get_logger().error(
'DatasetIngestor: %s' % (str(e)))
return ""
def _delete_attachments(self, pid, token):
""" delete attachment with given dataset pid
:param pid: dataset id
:type pid: :obj:`str`
:param token: ingestor token
:type token: :obj:`str`
:returns: dataset id
:rtype: :obj:`str`
"""
try:
datasetid = "%s%s" % (self.__pidprefix, pid)
# get_logger().info("DA %s %s" % (pid, datasetid))
odbs = self._get_attachments(datasetid, token) or []
# get_logger().info("DA2 %s %s" % (pid, odbs))
for odb in odbs:
if "id" in odb:
# get_logger().info("DA3 %s %s" % (odb["id"], odb))
self._get_delete_attachment(datasetid, odb["id"], token)
except Exception as e:
get_logger().error(
'DatasetIngestor: %s' % (str(e)))
return ""
def _update_attachments(self, tads, pid, token):
""" delete attachment with given dataset pid
:param pid: dataset id
:type pid: :obj:`str`
:param token: ingestor token
:type token: :obj:`str`
:returns: dataset id
:rtype: :obj:`str`
"""
dastatus = ""
try:
datasetid = "%s%s" % (self.__pidprefix, pid)
# get_logger().info("DA %s %s" % (pid, datasetid))
odbs = self._get_attachments(datasetid, token) or []
# get_logger().info("DA2 %s %s" % (pid, odbs))
found = []
for fads in tads:
with open(fads) as fl:
smt = fl.read()
ads = json.loads(smt)
if "thumbnail" in ads:
for odb in odbs:
if "thumbnail" in odb and \
odb["thumbnail"] == ads["thumbnail"]:
if "id" in odb:
found.append(odb["id"])
break
else:
dastatus = self._ingest_attachment_metadata(
fads, pid, token)
get_logger().info(
"DatasetIngestor: Ingest attachment: %s"
% (fads))
for odb in odbs:
if "id" in odb and odb["id"] not in found:
self._get_delete_attachment(datasetid, odb["id"], token)
except Exception as e:
get_logger().error(
'DatasetIngestor: %s' % (str(e)))
return dastatus
def _ingest_origdatablock_metadata(self, metafile, pid, token):
""" ingest origdatablock metadata
:param metafile: metadata file name
:type metafile: :obj:`str`
:param pid: dataset id
:type pid: :obj:`str`
:returns: dataset id
:rtype: :obj:`str`
"""
try:
with open(metafile) as fl:
smt = fl.read()
mt = json.loads(smt)
if not pid or not pid.startswith(self.__bid):
raise Exception(
"Wrong origdatablock datasetId %s for DESY beamtimeId "
"%s in %s"
% (pid, self.__bid, metafile))
if mt["datasetId"] != "%s%s" % (self.__pidprefix, pid):
mt["datasetId"] = "%s%s" % (self.__pidprefix, pid)
smt = json.dumps(mt)
with open(metafile, "w") as mf:
mf.write(smt)
status = time.time()
if "dataFileList" in mt and mt["dataFileList"]:
status = self._ingest_origdatablock(smt, token)
if status:
return mt["datasetId"]
except Exception as e:
get_logger().error(
'DatasetIngestor: %s' % (str(e)))
return ""
def _ingest_attachment_metadata(self, metafile, pid, token):
""" ingest attachment metadata
:param metafile: metadata file name
:type metafile: :obj:`str`
:param pid: dataset id
:type pid: :obj:`str`
:returns: dataset id
:rtype: :obj:`str`
"""
try:
with open(metafile) as fl:
smt = fl.read()
mt = json.loads(smt)
if "datasetId" in mt:
if not pid or not pid.startswith(self.__bid):
raise Exception(
"Wrong attachment datasetId %s for DESY beamtimeId "
"%s in %s"
% (pid, self.__bid, metafile))
if mt["datasetId"] != "%s%s" % (self.__pidprefix, pid):
mt["datasetId"] = "%s%s" % (self.__pidprefix, pid)
smt = json.dumps(mt)
with open(metafile, "w") as mf:
mf.write(smt)
else:
mt["datasetId"] = "%s%s" % (self.__pidprefix, pid)
smt = json.dumps(mt)
with open(metafile, "w") as mf:
mf.write(smt)
dsid = "%s%s" % (self.__pidprefix, pid)
status = self._ingest_attachment(smt, dsid, token)
if status:
return dsid
except Exception as e:
get_logger().error(
'DatasetIngestor: %s' % (str(e)))
return ""
[docs] def ingest(self, scan, token):
""" ingest scan
:param scan: scan name
:type scan: :obj:`str`
:param token: access token
:type token: :obj:`str`
"""
get_logger().info(
'DatasetIngestor: Ingesting: %s %s' % (
self.__dsfile, scan))
sscan = scan.split(" ")
self.__dctfmt["entryname"] = ""
self.__dctfmt["scanname"] = sscan[0] if len(sscan) > 0 else ""
self.__dctfmt["masterscanname"] = self.__dctfmt["scanname"]
sndir, snname = os.path.split(str(self.__dctfmt["scanname"]))
plist = []
self.__dctfmt["dbrelpath"] = ""
if self.__relpath_in_datablock:
plist.append(self.__dctfmt["relpath"])
if sndir:
plist.append(sndir)
if plist:
self.__dctfmt["dbrelpath"] = os.path.join(*plist)
rdss = glob.glob(
"{metapath}/{scan}{postfix}".format(
scan=self.__dctfmt["scanname"],
postfix=self.__scanpostfix,
metapath=self.__dctfmt["metapath"]))
if rdss and rdss[0]:
rds = rdss[0]
elif self.__forcegeneratemeasurement or \
self.__dctfmt["scanname"] not in self.__measurements:
rds = self._generate_rawdataset_metadata(self.__dctfmt["scanname"])
else:
rds = []
mtmds = 0
ads = None
if rds:
mtmds = os.path.getmtime(rds)
odbs = glob.glob(
"{metapath}/{scan}{postfix}".format(
scan=self.__dctfmt["scanname"],
postfix=self.__datablockpostfix,
metapath=self.__dctfmt["metapath"]))
if odbs and odbs[0] and not self.__single_datablock:
odb = odbs[0]
todb = [odb]
with open(odb) as fl:
dbmt = json.loads(fl.read())
if isinstance(dbmt, list):
if self.__skip_multi_datablock:
todb = []
else:
todb = dbmt
else:
odb = self._generate_origdatablock_metadata(scan)
todb = [odb]
mtmdb = 0
if odb:
mtmdb = os.path.getmtime(odb)
mtmda = 0
if self.__ingest_attachment:
adss = glob.glob(
"{metapath}/{scan}{postfix}".format(
scan=self.__dctfmt["scanname"],
postfix=self.__attachmentpostfix,
metapath=self.__dctfmt["metapath"]))
if adss and adss[0]:
ads = adss[0]
tads = [ads]
with open(ads) as fl:
admt = json.loads(fl.read())
if isinstance(admt, list):
if self.__skip_multi_attachment:
tads = []
else:
tads = admt
else:
ads = self._generate_attachment_metadata(
self.__dctfmt["scanname"])
tads = [ads]
if ads:
mtmda = os.path.getmtime(ads)
if (self.__callcallback or self.__measurement_status) \
and self.__metadatageneratedcallback \
and rds and odb:
command = self.__metadatageneratedcallback.format(**self.__dctfmt)
get_logger().info(
'DatasetIngestor: Metadata generated callback: %s ' % (
command))
subprocess.run(command, shell=True, check=True)
dbstatus = None
dastatus = None
pid = None
if rds and odb and not self.__skip_scan_dataset_ingestion:
if rds and rds[0]:
pid = self._ingest_rawdataset_metadata(rds, token)
if todb and todb[0] and pid:
if pid is None and rdss and rdss[0]:
pid = self._get_pid(rdss[0])
for odb in todb:
dbstatus = self._ingest_origdatablock_metadata(
odb, pid, token)
if not dbstatus:
mtmdb = -1
if pid is None and rdss and rdss[0]:
pid = self._get_pid(rdss[0])
if self.__ingest_attachment and tads and tads[0] and pid:
if pid is None and rdss and rdss[0]:
pid = self._get_pid(rdss[0])
for ads in tads:
dastatus = self._ingest_attachment_metadata(
ads, pid, token)
if not dastatus:
mtmda = -1
if pid is None:
if scan in self.__sc_seingested_map.keys():
mtmds = self.__sc_seingested_map[scan][-3]
else:
mtmds = 0
if dbstatus is None:
if scan in self.__sc_seingested_map.keys():
mtmdb = self.__sc_seingested_map[scan][-2]
else:
mtmdb = 0
if dastatus is None:
if scan in self.__sc_seingested_map.keys():
mtmda = self.__sc_seingested_map[scan][-2]
else:
mtmda = 0
sscan.extend([str(mtmds), str(mtmdb), str(mtmda)])
self.__sc_ingested.append(sscan)
self.__sc_seingested_map[scan] = [mtmds, mtmdb, mtmda]
with open(self.__idsfile, 'a+') as f:
f.write("%s %s %s %s\n" % (scan, mtmds, mtmdb, mtmda))
[docs] def reingest(self, scan, token, notmp=False):
""" re-ingest scan
:param scan: scan name
:type scan: :obj:`str`
:param token: access token
:type token: :obj:`str`
:param token: no tmp file flag
:type token: :obj:`book`
"""
get_logger().info(
'DatasetIngestor: Checking: %s %s' % (
self.__dsfile, scan))
reingest_dataset = False
reingest_origdatablock = False
reingest_attachment = False
sscan = scan.split(" ")
pscan = scan
self.__dctfmt["scanname"] = ""
self.__dctfmt["masterscanname"] = ""
self.__dctfmt["entryname"] = ""
if len(sscan) > 0:
if "::/" in sscan[0]:
if ";" in sscan[0]:
pscan, scanname = sscan[0].split(";")[:2]
else:
pscan = sscan[0]
scanname = sscan[0]
if ":" in scanname:
scanname = scanname.split(":")[0]
if "::/" in pscan:
gname, entryname = pscan.split("::/")[:2]
else:
gname, entryname = sscan[0].split("::/")[:2]
self.__dctfmt["scanname"] = scanname
self.__dctfmt["masterscanname"] = gname
self.__dctfmt["entryname"] = entryname
elif ":" in sscan[0]:
self.__dctfmt["scanname"] = sscan[0].split(":")[0]
pscan = " ".join([self.__dctfmt["scanname"]] + sscan[1:])
self.__dctfmt["masterscanname"] = self.__dctfmt["scanname"]
else:
self.__dctfmt["scanname"] = sscan[0]
self.__dctfmt["masterscanname"] = self.__dctfmt["scanname"]
sndir, snname = os.path.split(str(self.__dctfmt["scanname"]))
plist = []
self.__dctfmt["dbrelpath"] = ""
if self.__relpath_in_datablock:
plist.append(self.__dctfmt["relpath"])
if sndir:
plist.append(sndir)
if plist:
self.__dctfmt["dbrelpath"] = os.path.join(*plist)
rds = None
rdss = glob.glob(
"{metapath}/{scan}{postfix}".format(
scan=self.__dctfmt["scanname"],
postfix=self.__scanpostfix,
metapath=self.__dctfmt["metapath"]))
if rdss and rdss[0]:
rds = rdss[0]
mtm = os.path.getmtime(rds)
# print(self.__sc_ingested_map.keys())
get_logger().debug("MAP: %s" % (self.__sc_ingested_map))
if scan in self.__sc_ingested_map.keys():
get_logger().debug("DS Timestamps: %s %s %s %s" % (
scan,
mtm, self.__sc_ingested_map[scan][-3],
mtm > self.__sc_ingested_map[scan][-3]))
if scan not in self.__sc_ingested_map.keys() \
or mtm > self.__sc_ingested_map[scan][-3]:
if self.__strategy != UpdateStrategy.NO:
reingest_dataset = True
elif self.__forcegeneratemeasurement or \
self.__dctfmt["scanname"] not in self.__measurements:
rds = self._generate_rawdataset_metadata(
self.__dctfmt["scanname"])
get_logger().debug("DS No File: %s True" % (scan))
reingest_dataset = True
else:
rds = []
mtmds = 0
if rds:
mtmds = os.path.getmtime(rds)
odbs = glob.glob(
"{metapath}/{scan}{postfix}".format(
scan=self.__dctfmt["scanname"],
postfix=self.__datablockpostfix,
metapath=self.__dctfmt["metapath"]))
if odbs and odbs[0] and not self.__single_datablock:
odb = odbs[0]
todb = [odb]
olst = False
with open(odb) as fl:
dbmt = json.loads(fl.read())
if isinstance(dbmt, list):
olst = True
if self.__skip_multi_datablock:
todb = []
else:
todb = dbmt
mtm0 = os.path.getmtime(odb)
if scan not in self.__sc_ingested_map.keys() \
or mtm0 > self.__sc_ingested_map[scan][-2]:
reingest_origdatablock = True
if scan in self.__sc_ingested_map.keys():
get_logger().debug("DB0 Timestamps: %s %s %s %s %s" % (
scan,
mtm0, self.__sc_ingested_map[scan][-2],
mtm0 - self.__sc_ingested_map[scan][-2],
reingest_origdatablock)
)
if not olst:
self._regenerate_origdatablock_metadata(
pscan, reingest_origdatablock)
mtm = os.path.getmtime(odb)
if scan in self.__sc_ingested_map.keys():
get_logger().debug("DB Timestamps: %s %s %s %s" % (
scan,
mtm, self.__sc_ingested_map[scan][-2],
mtm > self.__sc_ingested_map[scan][-2]))
if scan not in self.__sc_ingested_map.keys() \
or mtm > self.__sc_ingested_map[scan][-2]:
reingest_origdatablock = True
else:
odb = self._generate_origdatablock_metadata(pscan)
todb = [odb]
get_logger().debug("DB No File: %s True" % (scan))
reingest_origdatablock = True
mfilename = ""
odb2 = ""
if self.__dctfmt["masterscanname"] != self.__dctfmt["scanname"]:
mfilename = "{metapath}/_{masterscanname}{datablockpostfix}". \
format(**self.__dctfmt)
odb2 = self._regenerate_origdatablock_metadata(
self.__dctfmt["masterscanname"], True, mfilename=mfilename)
if odb2 and os.path.isfile(odb2) and odb2 not in todb:
todb.insert(0, odb2)
reingest_origdatablock = True
mtmdb = 0
if odb:
mtmdb = os.path.getmtime(odb)
if (self.__callcallback or self.__measurement_status) \
and self.__metadatageneratedcallback \
and rds and odb:
command = self.__metadatageneratedcallback.format(**self.__dctfmt)
get_logger().info(
'DatasetIngestor: Metadata generated callback: %s ' % (
command))
subprocess.run(command, shell=True, check=True)
dastatus = None
dbstatus = None
ads = None
tads = []
if self.__ingest_attachment:
adss = glob.glob(
"{metapath}/{scan}{postfix}".format(
scan=self.__dctfmt["scanname"],
postfix=self.__attachmentpostfix,
metapath=self.__dctfmt["metapath"]))
if adss and adss[0]:
ads = adss[0]
tads = [ads]
with open(ads) as fl:
admt = json.loads(fl.read())
if isinstance(admt, list):
if self.__skip_multi_attachment:
tads = []
else:
tads = admt
mtm0 = os.path.getmtime(ads)
if scan in self.__sc_ingested_map.keys():
get_logger().debug(
"ATTRIBUTE REINGEST check: %s ?? %s"
% (mtm0, self.__sc_ingested_map[scan][-1]))
if scan not in self.__sc_ingested_map.keys() \
or mtm0 > self.__sc_ingested_map[scan][-1]:
reingest_attachment = True
else:
ads = self._generate_attachment_metadata(
self.__dctfmt["scanname"])
reingest_attachment = True
tads = [ads]
if ads:
mtm0 = os.path.getmtime(ads)
pid = None
if (rds and odb) or ads:
if rds and reingest_dataset:
pid = self._ingest_rawdataset_metadata(rds, token)
get_logger().info(
"DatasetIngestor: Ingest dataset: %s" % (rds))
oldpid = self._get_pid(rds)
if pid and oldpid != pid:
if not olst:
odb = self._generate_origdatablock_metadata(scan)
reingest_origdatablock = True
if todb and todb[0] and reingest_origdatablock:
if pid is None and rdss and rdss[0]:
pid = self._get_pid(rdss[0])
self._delete_origdatablocks(pid, token)
for odb in todb:
dbstatus = self._ingest_origdatablock_metadata(
odb, pid, token)
get_logger().info(
"DatasetIngestor: Ingest origdatablock: %s" % (odb))
if not dbstatus:
mtmdb = -1
get_logger().debug("Ingest Attachment %s %s %s" % (
self.__ingest_attachment, tads, reingest_attachment))
if self.__ingest_attachment:
if tads and tads[0] and reingest_attachment:
if pid is None and rdss and rdss[0]:
pid = self._get_pid(rdss[0])
if not pid:
get_logger().error(
"DatasetIngestor: No dataset pid "
"for the attachment found: %s" % (ads))
else:
get_logger().debug("Attachment PID %s %s"
% (tads, pid))
if self.__strategy in [UpdateStrategy.PATCH,
UpdateStrategy.MIXED]:
dastatus = self._update_attachments(
tads, pid, token)
else:
self._delete_attachments(pid, token)
for ads in tads:
dastatus = self._ingest_attachment_metadata(
ads, pid, token)
get_logger().info(
"DatasetIngestor: Ingest attachment: %s"
% (ads))
if not dastatus:
mtmda = -1
mtmda = 0
if ads:
mtmda = os.path.getmtime(ads)
if (pid and reingest_dataset):
pass
elif scan in self.__sc_ingested_map.keys():
mtmds = self.__sc_ingested_map[scan][-3]
else:
mtmds = 0
if (dbstatus and reingest_origdatablock):
pass
elif scan in self.__sc_ingested_map.keys():
mtmdb = self.__sc_ingested_map[scan][-2]
else:
mtmdb = 0
if (dastatus and reingest_attachment):
pass
elif scan in self.__sc_ingested_map.keys():
mtmda = self.__sc_ingested_map[scan][-1]
else:
mtmda = 0
sscan.extend([str(mtmds), str(mtmdb), str(mtmda)])
self.__sc_ingested.append(sscan)
self.__sc_seingested_map[scan] = [mtmds, mtmdb, mtmda]
lfile = self.__idsfiletmp
if notmp:
lfile = self.__idsfile
with open(lfile, 'a+') as f:
f.write("%s %s %s %s\n" % (scan, mtmds, mtmdb, mtmda))
[docs] def check_list(self, reingest=False):
""" update waiting and ingested datasets
"""
with open(self.__dsfile, "r") as dsf:
scans = [sc.strip()
for sc in dsf.read().split("\n")
if sc.strip()]
if os.path.isfile(self.__idsfile):
with open(self.__idsfile, "r") as idsf:
self.__sc_ingested = [
sc.strip().split(" ")
for sc in idsf.read().split("\n")
if sc.strip()]
for sc in self.__sc_ingested:
try:
if len(sc) > 3:
self.__sc_seingested_map[" ".join(sc[:-3])] = \
[float(sc[-3]),
float(sc[-2]),
float(sc[-1])]
except Exception as e:
get_logger().debug("%s" % str(e))
if not reingest:
if self.__retry_failed_dataset_ingestion:
check_attach = self.__retry_failed_attachment_ingestion \
and self.__ingest_attachment
ingested = []
for sc in self.__sc_ingested:
if len(sc) > 3:
try:
if float(sc[-1]) != -1 \
and (not check_attach or float(sc[-1]) > 0) \
and float(sc[-2]) > 0 and float(sc[-3]) > 0:
ingested.append(" ".join(sc[:-3]))
except Exception as e:
get_logger().debug("%s" % str(e))
else:
ingested.append(sc[0])
else:
ingested = [(" ".join(sc[:-3]) if len(sc) > 3 else sc[0])
for sc in self.__sc_ingested]
self.__sc_ingested_map = {}
for sc in self.__sc_ingested:
try:
if len(sc) > 3 and float(sc[-1]) >= 0 \
and float(sc[-2]) > 0 and float(sc[-3]) > 0:
sc[-1] = float(sc[-1])
sc[-2] = float(sc[-2])
sc[-3] = float(sc[-3])
self.__sc_ingested_map[" ".join(sc[:-3])] = sc
except Exception as e:
get_logger().debug("%s" % str(e))
self.__sc_waiting = [
sc for sc in scans if sc not in ingested]
else:
self.__sc_waiting = [sc for sc in scans]
self.__sc_ingested_map = {}
for sc in self.__sc_ingested:
try:
if len(sc) > 3 and float(sc[-1]) >= 0 \
and float(sc[-2]) > 0 and float(sc[-3]) > 0:
sc[-1] = float(sc[-1])
sc[-2] = float(sc[-2])
sc[-3] = float(sc[-3])
self.__sc_ingested_map[" ".join(sc[:-3])] = sc
except Exception as e:
get_logger().debug("%s" % str(e))
[docs] def waiting_datasets(self):
""" provides waitings datasets
:returns: waitings datasets list
:rtype: :obj:`list` <:obj:`str`>
"""
return list(self.__sc_waiting)
[docs] def clear_waiting_datasets(self):
""" clear waitings datasets
"""
self.__sc_waiting = []
self.__measurements = set()
[docs] def clear_tmpfile(self):
""" clear waitings datasets
"""
if os.path.exists(self.__idsfiletmp):
os.remove(self.__idsfiletmp)
[docs] def update_from_tmpfile(self):
""" clear waitings datasets
"""
os.rename(self.__idsfiletmp, self.__idsfile)
[docs] def ingested_datasets(self):
""" provides ingested datasets
:returns: ingested datasets list
:rtype: :obj:`list` <:obj:`str`>
"""
return list(self.__sc_ingested)
[docs] def stop_measurement(self):
""" stop measurement
"""
self.__measurement_status = False
self.__dctfmt["measurement"] = ""
get_logger().debug("Stop Measurement: %s" % self.__measurement)
[docs] def start_measurement(self, measurement):
""" start measurement
:param measurement: measurement name
:type measurement: :obj:`str`
"""
self.__measurement = measurement
self.__dctfmt["measurement"] = self.__measurement
self.__dctfmt["lastmeasurement"] = self.__measurement
self.__measurements.add(self.__measurement)
self.__measurement_status = True
get_logger().debug("Start Measurement: %s" % self.__measurement)