# Copyright 2021 MONAI Consortium
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import datetime
import json
import logging
import os
from random import randint
from typing import List, Optional, Union
import numpy as np
from monai.deploy.utils.importutil import optional_import
from monai.deploy.utils.version import get_sdk_semver
dcmread, _ = optional_import("pydicom", name="dcmread")
generate_uid, _ = optional_import("pydicom.uid", name="generate_uid")
ImplicitVRLittleEndian, _ = optional_import("pydicom.uid", name="ImplicitVRLittleEndian")
Dataset, _ = optional_import("pydicom.dataset", name="Dataset")
FileDataset, _ = optional_import("pydicom.dataset", name="FileDataset")
Sequence, _ = optional_import("pydicom.sequence", name="Sequence")
sitk, _ = optional_import("SimpleITK")
from monai.deploy.core import (
DataPath,
ExecutionContext,
Image,
InputContext,
IOType,
Operator,
OutputContext,
env,
input,
output,
)
from monai.deploy.core.domain.dicom_series import DICOMSeries
from monai.deploy.operators.dicom_data_loader_operator import DICOMDataLoaderOperator
from monai.deploy.operators.dicom_series_to_volume_operator import DICOMSeriesToVolumeOperator
[docs]@input("seg_image", Image, IOType.IN_MEMORY)
@input("dicom_series", DICOMSeries, IOType.IN_MEMORY)
@output("dicom_seg_instance", DataPath, IOType.DISK)
@env(pip_packages=["pydicom >= 1.4.2", "SimpleITK >= 2.0.0"])
class DICOMSegmentationWriterOperator(Operator):
"""
This operator writes out a DICOM Segmentation Part 10 file to disk
"""
# Supported input image format, based on extension.
SUPPORTED_EXTENSIONS = [".nii", ".nii.gz", ".mhd"]
# DICOM instance file extension. Case insentiive in string comaprision.
DCM_EXTENSION = ".dcm"
# Suffix to add to file name to indicate DICOM Seg dcm file.
DICOMSEG_SUFFIX = "-DICOMSEG"
def __init__(self, seg_labels: Optional[Union[List[str], str]] = None, *args, **kwargs):
super().__init__(*args, **kwargs)
"""Instantiates the DICOM Seg Writer instance with optional list of segment label strings.
A string can be used instead of a numerical value for a segment in the segmentation image.
As of now, integer values are supported for segment mask, and it is further required that the named
segment will start with 1 and increment sequentially if there are additional segments, while the
background is of value 0. The caller needs to pass in a string list, whose length corresponds
to the number of actual segments. The position index + 1 would be the corresponding segment's
numerical value.
For example, in the CT Spleen Segmentation application, the whole image background has a value
of 0, and the Spleen segment of value 1. This then only requires the caller to pass in a list
containing a single string, which is used as label for the Spleen in the DICOM Seg instance.
Note: this interface is subject to change. It is planned that a new object will encapsulate the
segment label information, including label value, name, description etc.
Args:
seg_labels: The string name for each segment
"""
self._seg_labels = ["SegmentLabel-default"]
if isinstance(seg_labels, str):
self._seg_labels = [seg_labels]
elif isinstance(seg_labels, list):
for label in seg_labels:
if not isinstance(label, str):
raise ValueError(f"List of strings expected, but contains {label} of type {type(label)}.")
self._seg_labels = seg_labels
else:
raise ValueError("List of strings expected.")
[docs] def compute(self, input: InputContext, output: OutputContext, context: ExecutionContext):
dicom_series = input.get("dicom_series")
# Get the seg image in numpy
seg_image_numpy = None
input_path = "dicom_seg"
seg_image = input.get("seg_image")
if isinstance(seg_image, Image):
seg_image_numpy = seg_image.asnumpy()
elif isinstance(seg_image, DataPath):
input_path = input.get("segmentation_image").path
input_path, _ = self.select_input_file(input_path)
seg_image_numpy = self._image_file_to_numpy(input_path)
else:
# What else
raise ValueError("seg_image is not Image or DataPath")
# Create the output path for created DICOM Seg instance
output_dir = output.get().path
output_dir.mkdir(parents=True, exist_ok=True)
output_filename = "{0}{1}{2}".format(
os.path.splitext(os.path.basename(input_path))[0],
DICOMSegmentationWriterOperator.DICOMSEG_SUFFIX,
DICOMSegmentationWriterOperator.DCM_EXTENSION,
)
output_path = os.path.join(output_dir, output_filename)
self.create_dicom_seg(seg_image_numpy, dicom_series, output_path)
def create_dicom_seg(self, image, dicom_series: DICOMSeries, file_path: str):
dicom_dataset_list = [i.get_native_sop_instance() for i in dicom_series.get_sop_instances()]
# DICOM Seg creation
self._seg_writer = DICOMSegWriter()
try:
self._seg_writer.write(image, dicom_dataset_list, file_path, self._seg_labels)
# TODO: get a class to encapsulate the seg label information.
# Test reading back
_ = self._read_from_dcm(file_path)
except Exception as ex:
print("DICOMSeg creation failed. Error:\n{}".format(ex))
raise
def _read_from_dcm(self, file_path):
"""Read dcm file into pydicom Dataset
Args:
file_path (str): The path to dcm file
"""
return dcmread(file_path)
def _image_file_to_numpy(self, input_path: str):
"""Converts image file to numpy"""
img = sitk.ReadImage(input_path)
data_np = sitk.GetArrayFromImage(img)
if data_np is None:
raise RuntimeError("Failed to convert image file to numpy: {}".format(input_path))
return data_np.astype(np.uint8)
def _get_label_list(self, stringfied_list_of_labels: str = ""):
"""Parse the string to get the label list.
If empty string is provided, a list of a single element is retured.
Args:
stringfied_list_of_labels (str): string representing the list of segmentation labels.
Returns:
list of label strings
"""
# Use json.loads as a convenience method to convert string to list of strings
assert isinstance(stringfied_list_of_labels, str), "Expected stringfiled list pf labels."
label_list = ["default-label"] # Use this as default if empty string
if stringfied_list_of_labels:
label_list = json.loads(stringfied_list_of_labels)
return label_list
class DICOMSegWriter(object):
def __init__(self):
"""Class to write DICOM SEG with the segmentation image and DICOM dataset."""
self._logger = logging.getLogger("{}.{}".format(__name__, type(self).__name__))
def write(self, seg_img, input_ds, outfile, seg_labels):
"""Write DICOM Segmentation object for the segmentation image
Args:
seg_img (numpy array): numpy array of the segmentatin image.
input_ds (list): list of Pydicom datasets of the original DICOM instances.
outfile (str): path for the output DICOM instance file.
seg_labels: list of labels for the segments
"""
if seg_img is None:
raise ValueError("Argument seg_img cannot be None.")
if not isinstance(input_ds, list) or len(input_ds) < 1:
raise ValueError("Argument input_ds must not be empty.")
if not outfile:
raise ValueError("Argument outfile must not be a valid string.")
if not isinstance(seg_labels, list) or len(seg_labels) < 1:
raise ValueError("Argument seg_labels must not be empty.")
# Find out the number of DICOM instance datasets
num_of_dcm_ds = len(input_ds)
self._logger.info("Number of DICOM instance datasets in the list: {}".format(num_of_dcm_ds))
# Find out the number of slices in the numpy array
num_of_img_slices = seg_img.shape[0]
self._logger.info("Number of slices in the numpy image: {}".format(num_of_img_slices))
# Find out the labels
self._logger.info("Labels of the segments: {}".format(seg_labels))
# Find out the unique values in the seg image
unique_elements = np.unique(seg_img, return_counts=False)
self._logger.info("Unique values in seg image: {}".format(unique_elements))
dcm_out = create_multiframe_metadata(outfile, input_ds[0])
create_label_segments(dcm_out, seg_labels)
set_pixel_meta(dcm_out, input_ds[0])
segslice_from_mhd(dcm_out, seg_img, input_ds, len(seg_labels))
self._logger.info("Saving output file {}".format(outfile))
dcm_out.save_as(outfile, False)
self._logger.info("File saved.")
# The following functions are mostly based on the implementation demo'ed at RSNA 2019.
# They can be further refactored and made into class methods, but work for now.
def safe_get(ds, key):
"""Safely gets the tag value if present from the Dataset and logs failure.
The safe get method of dict works for str, but not the hex key. The added
benefit of this funtion is that it logs the failure to get the keyed value.
Args:
ds (Dataset): pydicom Dataset
key (hex | str): Hex code or string name for a key.
"""
try:
return ds[key].value
except KeyError as e:
logging.error("Failed to get value for key: {}".format(e))
return ""
def random_with_n_digits(n):
assert isinstance(n, int), "Argument n must be a int."
n = n if n >= 1 else 1
range_start = 10 ** (n - 1)
range_end = (10 ** n) - 1
return randint(range_start, range_end)
def create_multiframe_metadata(dicom_file, input_ds):
"""Creates the DICOM metadata for the multiframe object, e.g. SEG
Args:
dicom_file (str or object): The filename or the object type of the file-like the FileDataset was read from.
input_ds (Dataset): pydicom dataset of original DICOM instance.
Returns:
FileDataset: The object with metadata assigned.
"""
currentDateRaw = datetime.datetime.now()
currentDate = currentDateRaw.strftime("%Y%m%d")
currentTime = currentDateRaw.strftime("%H%M%S.%f") # long format with micro seconds
segmentationSeriesInstanceUID = generate_uid(prefix=None)
segmentationSOPInstanceUID = generate_uid(prefix=None)
# Populate required values for file meta information
file_meta = Dataset()
file_meta.MediaStorageSOPClassUID = "1.2.840.10008.5.1.4.1.1.66.4"
file_meta.MediaStorageSOPInstanceUID = segmentationSOPInstanceUID
file_meta.ImplementationClassUID = "1.2.840.10008.5.1.4.1.1.66.4"
file_meta.TransferSyntaxUID = ImplicitVRLittleEndian
# create dicom global metadata
dicomOutput = FileDataset(dicom_file, {}, file_meta=file_meta, preamble=b"\0" * 128)
# It is important to understand the Types of DICOM attributes when getting from the original
# dataset, and creating/setting them in the new dataset, .e.g Type 1 is mandatory, though
# non-conformant instance may not have them, Type 2 present but maybe blank, and Type 3 may
# be absent.
# None of Patient module attributes are mandatory.
# The following are Type 2, present though could be blank
dicomOutput.PatientName = input_ds.get("PatientName", "") # name is actual suppoted
dicomOutput.add_new(0x00100020, "LO", safe_get(input_ds, 0x00100020)) # PatientID
dicomOutput.add_new(0x00100030, "DA", safe_get(input_ds, 0x00100030)) # PatientBirthDate
dicomOutput.add_new(0x00100040, "CS", safe_get(input_ds, 0x00100040)) # PatientSex
dicomOutput.add_new(0x00104000, "LT", safe_get(input_ds, "0x00104000")) # PatientComments
# For Study module, copy original StudyInstanceUID and other Type 2 study attributes
# Only Study Instance UID is Type 1, though still may be absent, so try to get
dicomOutput.add_new(0x0020000D, "UI", safe_get(input_ds, 0x0020000D)) # StudyInstanceUID
dicomOutput.add_new(0x00080020, "DA", input_ds.get("StudyDate", currentDate)) # StudyDate
dicomOutput.add_new(0x00080030, "TM", input_ds.get("StudyTime", currentTime)) # StudyTime
dicomOutput.add_new(0x00080090, "PN", safe_get(input_ds, 0x00080090)) # ReferringPhysicianName
dicomOutput.add_new(0x00200010, "SH", safe_get(input_ds, 0x00200010)) # StudyID
dicomOutput.add_new(0x00080050, "SH", safe_get(input_ds, 0x00080050)) # AccessionNumber
# Series module with new attribute values, only Modality and SeriesInstanceUID are Type 1
dicomOutput.add_new(0x00080060, "CS", "SEG") # Modality
dicomOutput.add_new(0x0020000E, "UI", segmentationSeriesInstanceUID) # SeriesInstanceUID
dicomOutput.add_new(0x00200011, "IS", random_with_n_digits(4)) # SeriesNumber (randomized)
descr = "CAUTION: Research Use Only. MONAI Deploy App SDK generated DICOM SEG"
if safe_get(input_ds, 0x0008103E):
descr += " for " + safe_get(input_ds, 0x0008103E)
dicomOutput.add_new(0x0008103E, "LO", descr) # SeriesDescription
dicomOutput.add_new(0x00080021, "DA", currentDate) # SeriesDate
dicomOutput.add_new(0x00080031, "TM", currentTime) # SeriesTime
# General Equipment module, only Manufacturer is Type 2, the rest Type 3
dicomOutput.add_new(0x00181000, "LO", "0000") # DeviceSerialNumber
dicomOutput.add_new(0x00080070, "LO", "MONAI Deploy") # Manufacturer
dicomOutput.add_new(0x00081090, "LO", "App SDK") # ManufacturerModelName
try:
version_str = get_sdk_semver() # SDK Version
except Exception:
version_str = "0.1" # Fall back to the initial version
dicomOutput.add_new(0x00181020, "LO", version_str) # SoftwareVersions
# SOP common, only SOPClassUID and SOPInstanceUID are Type 1
dicomOutput.add_new(0x00200013, "IS", 1) # InstanceNumber
dicomOutput.add_new(0x00080016, "UI", "1.2.840.10008.5.1.4.1.1.66.4") # SOPClassUID, per DICOM.
dicomOutput.add_new(0x00080018, "UI", segmentationSOPInstanceUID) # SOPInstanceUID
dicomOutput.add_new(0x00080012, "DA", currentDate) # InstanceCreationDate
dicomOutput.add_new(0x00080013, "TM", currentTime) # InstanceCreationTime
# General Image module.
dicomOutput.add_new(0x00080008, "CS", ["DERIVED", "PRIMARY"]) # ImageType
dicomOutput.add_new(0x00200020, "CS", "") # PatientOrientation, forced empty
# Set content date/time
dicomOutput.ContentDate = currentDate
dicomOutput.ContentTime = currentTime
# Image Pixel
dicomOutput.add_new(0x00280002, "US", 1) # SamplesPerPixel
dicomOutput.add_new(0x00280004, "CS", "MONOCHROME2") # PhotometricInterpretation
# Common Instance Reference module
dicomOutput.add_new(0x00081115, "SQ", [Dataset()]) # ReferencedSeriesSequence
# Set the referenced SeriesInstanceUID
dicomOutput.get(0x00081115)[0].add_new(0x0020000E, "UI", safe_get(input_ds, 0x0020000E))
# Multi-frame Dimension Module
dimensionID = generate_uid(prefix=None)
dimensionOragnizationSequence = Sequence()
dimensionOragnizationSequenceDS = Dataset()
dimensionOragnizationSequenceDS.add_new(0x00209164, "UI", dimensionID) # DimensionOrganizationUID
dimensionOragnizationSequence.append(dimensionOragnizationSequenceDS)
dicomOutput.add_new(0x00209221, "SQ", dimensionOragnizationSequence) # DimensionOrganizationSequence
dimensionIndexSequence = Sequence()
dimensionIndexSequenceDS = Dataset()
dimensionIndexSequenceDS.add_new(0x00209164, "UI", dimensionID) # DimensionOrganizationUID
dimensionIndexSequenceDS.add_new(0x00209165, "AT", 0x00209153) # DimensionIndexPointer
dimensionIndexSequenceDS.add_new(0x00209167, "AT", 0x00209153) # FunctionalGroupPointer
dimensionIndexSequence.append(dimensionIndexSequenceDS)
dicomOutput.add_new(0x00209222, "SQ", dimensionIndexSequence) # DimensionIndexSequence
return dicomOutput
def create_label_segments(dcm_output, seg_labels):
""" "Creates the segments with the given labels"""
def create_label_segment(label, name):
"""Creates segment labels"""
segment = Dataset()
segment.add_new(0x00620004, "US", int(label)) # SegmentNumber
segment.add_new(0x00620005, "LO", name) # SegmentLabel
segment.add_new(0x00620009, "LO", "AI Organ Segmentation") # SegmentAlgorithmName
segment.SegmentAlgorithmType = "AUTOMATIC" # SegmentAlgorithmType
segment.add_new(0x0062000D, "US", [128, 174, 128]) # RecommendedDisplayCIELabValue
# Create SegmentedPropertyCategoryCodeSequence
segmentedPropertyCategoryCodeSequence = Sequence()
segmentedPropertyCategoryCodeSequenceDS = Dataset()
segmentedPropertyCategoryCodeSequenceDS.add_new(0x00080100, "SH", "T-D0050") # CodeValue
segmentedPropertyCategoryCodeSequenceDS.add_new(0x00080102, "SH", "SRT") # CodingSchemeDesignator
segmentedPropertyCategoryCodeSequenceDS.add_new(0x00080104, "LO", "Anatomical Structure") # CodeMeaning
segmentedPropertyCategoryCodeSequence.append(segmentedPropertyCategoryCodeSequenceDS)
segment.SegmentedPropertyCategoryCodeSequence = segmentedPropertyCategoryCodeSequence
# Create SegmentedPropertyTypeCodeSequence
segmentedPropertyTypeCodeSequence = Sequence()
segmentedPropertyTypeCodeSequenceDS = Dataset()
segmentedPropertyTypeCodeSequenceDS.add_new(0x00080100, "SH", "T-D0050") # CodeValue
segmentedPropertyTypeCodeSequenceDS.add_new(0x00080102, "SH", "SRT") # CodingSchemeDesignator
segmentedPropertyTypeCodeSequenceDS.add_new(0x00080104, "LO", "Organ") # CodeMeaning
segmentedPropertyTypeCodeSequence.append(segmentedPropertyTypeCodeSequenceDS)
segment.SegmentedPropertyTypeCodeSequence = segmentedPropertyTypeCodeSequence
return segment
segments = Sequence()
# Assumes the label starts at 1 and increment sequentially.
# TODO: This part needs to be more deteministic, e.g. with a dict.
for lb, name in enumerate(seg_labels, 1):
segment = create_label_segment(lb, name)
segments.append(segment)
dcm_output.add_new(0x00620002, "SQ", segments) # SegmentSequence
def create_frame_meta(input_ds, label, ref_instances, dimIdxVal, instance_num):
"""Creates the metadata for the each frame"""
sop_inst_uid = safe_get(input_ds, 0x00080018) # SOPInstanceUID
sourceInstanceSOPClass = safe_get(input_ds, 0x00080016) # SOPClassUID
# add frame to Referenced Image Sequence
frame_ds = Dataset()
referenceInstance = Dataset()
referenceInstance.add_new(0x00081150, "UI", sourceInstanceSOPClass) # ReferencedSOPClassUID
referenceInstance.add_new(0x00081155, "UI", sop_inst_uid) # ReferencedSOPInstanceUID
ref_instances.append(referenceInstance)
############################
# CREATE METADATA
############################
# Create DerivationImageSequence within Per-frame Functional Groups sequence
derivationImageSequence = Sequence()
derivationImage = Dataset()
# Create SourceImageSequence within DerivationImageSequence
sourceImageSequence = Sequence()
sourceImage = Dataset()
# TODO if CT multi-frame
# sourceImage.add_new(0x00081160, 'IS', inputFrameCounter + 1) # Referenced Frame Number
sourceImage.add_new(0x00081150, "UI", sourceInstanceSOPClass) # ReferencedSOPClassUID
sourceImage.add_new(0x00081155, "UI", sop_inst_uid) # ReferencedSOPInstanceUID
# Create PurposeOfReferenceCodeSequence within SourceImageSequence
purposeOfReferenceCodeSequence = Sequence()
purposeOfReferenceCode = Dataset()
purposeOfReferenceCode.add_new(0x00080100, "SH", "121322") # CodeValue
purposeOfReferenceCode.add_new(0x00080102, "SH", "DCM") # CodingSchemeDesignator
purposeOfReferenceCode.add_new(0x00080104, "LO", "Anatomical Stucture") # CodeMeaning
purposeOfReferenceCodeSequence.append(purposeOfReferenceCode)
sourceImage.add_new(0x0040A170, "SQ", purposeOfReferenceCodeSequence) # PurposeOfReferenceCodeSequence
sourceImageSequence.append(sourceImage) # AEH Beck commentout
# Create DerivationCodeSequence within DerivationImageSequence
derivationCodeSequence = Sequence()
derivationCode = Dataset()
derivationCode.add_new(0x00080100, "SH", "113076") # CodeValue
derivationCode.add_new(0x00080102, "SH", "DCM") # CodingSchemeDesignator
derivationCode.add_new(0x00080104, "LO", "Segmentation") # CodeMeaning
derivationCodeSequence.append(derivationCode)
derivationImage.add_new(0x00089215, "SQ", derivationCodeSequence) # DerivationCodeSequence
derivationImage.add_new(0x00082112, "SQ", sourceImageSequence) # SourceImageSequence
derivationImageSequence.append(derivationImage)
frame_ds.add_new(0x00089124, "SQ", derivationImageSequence) # DerivationImageSequence
# Create FrameContentSequence within Per-frame Functional Groups sequence
frameContent = Sequence()
dimensionIndexValues = Dataset()
dimensionIndexValues.add_new(0x00209157, "UL", [dimIdxVal, instance_num]) # DimensionIndexValues
frameContent.append(dimensionIndexValues)
frame_ds.add_new(0x00209111, "SQ", frameContent) # FrameContentSequence
# Create PlanePositionSequence within Per-frame Functional Groups sequence
planePositionSequence = Sequence()
imagePositionPatient = Dataset()
imagePositionPatient.add_new(0x00200032, "DS", safe_get(input_ds, 0x00200032)) # ImagePositionPatient
planePositionSequence.append(imagePositionPatient)
frame_ds.add_new(0x00209113, "SQ", planePositionSequence) # PlanePositionSequence
# Create PlaneOrientationSequence within Per-frame Functional Groups sequence
planeOrientationSequence = Sequence()
imageOrientationPatient = Dataset()
imageOrientationPatient.add_new(0x00200037, "DS", safe_get(input_ds, 0x00200037)) # ImageOrientationPatient
planeOrientationSequence.append(imageOrientationPatient)
frame_ds.add_new(0x00209116, "SQ", planeOrientationSequence) # PlaneOrientationSequence
# Create SegmentIdentificationSequence within Per-frame Functional Groups sequence
segmentIdentificationSequence = Sequence()
referencedSegmentNumber = Dataset()
# TODO lop over label and only get pixel with that value
referencedSegmentNumber.add_new(0x0062000B, "US", label) # ReferencedSegmentNumber, which label is this frame
segmentIdentificationSequence.append(referencedSegmentNumber)
frame_ds.add_new(0x0062000A, "SQ", segmentIdentificationSequence) # SegmentIdentificationSequence
return frame_ds
def set_pixel_meta(dicomOutput, input_ds):
"""Sets the pixel metadata in the DICOM object"""
dicomOutput.Rows = input_ds.Rows
dicomOutput.Columns = input_ds.Columns
dicomOutput.BitsAllocated = 1 # add_new(0x00280100, 'US', 8) # Bits allocated
dicomOutput.BitsStored = 1
dicomOutput.HighBit = 0
dicomOutput.PixelRepresentation = 0
# dicomOutput.PixelRepresentation = input_ds.PixelRepresentation
dicomOutput.SamplesPerPixel = 1
dicomOutput.ImageType = "DERIVED\\PRIMARY"
dicomOutput.ContentLabel = "SEGMENTATION"
dicomOutput.ContentDescription = ""
dicomOutput.ContentCreatorName = ""
dicomOutput.LossyImageCompression = "00"
dicomOutput.SegmentationType = "BINARY"
dicomOutput.MaximumFractionalValue = 1
dicomOutput.SharedFunctionalGroupsSequence = Sequence()
dicomOutput.PixelPaddingValue = 0
# Try to get the attributes from the original.
# Even though they are Type 1 and 2, can still be absent
dicomOutput.PixelSpacing = copy.deepcopy(input_ds.get("PixelSpacing", None))
dicomOutput.SliceThickness = input_ds.get("SliceThickness", "")
dicomOutput.RescaleSlope = 1
dicomOutput.RescaleIntercept = 0
# Set the transfer syntax
dicomOutput.is_little_endian = False # True
dicomOutput.is_implicit_VR = False # True
def segslice_from_mhd(dcm_output, seg_img, input_ds, num_labels):
"""Sets the pixel data from the input numpy image"""
if np.amax(seg_img) == 0 and np.amin(seg_img) == 0:
raise ValueError("Seg mask is not detected; all 0's.")
# add frames
out_frame_counter = 0
out_frames = Sequence()
out_pixels = None
referenceInstances = Sequence()
for img_slice in range(seg_img.shape[0]):
dimIdxVal = 0
for label in range(1, num_labels + 1):
# Determine if frame gets output
if np.count_nonzero(seg_img[img_slice, ...] == label) == 0: # no label for this frame --> skip
continue
dimIdxVal += 1
frame_meta = create_frame_meta(input_ds[img_slice], label, referenceInstances, dimIdxVal, img_slice)
out_frames.append(frame_meta)
logging.info(
"img slice {}, label {}, frame {}, img pos {}".format(
img_slice, label, out_frame_counter, safe_get(input_ds[img_slice], 0x00200032)
)
)
seg_slice = np.zeros((1, seg_img.shape[1], seg_img.shape[2]), dtype=np.bool)
seg_slice[np.expand_dims(seg_img[img_slice, ...] == label, 0)] = 1
if out_pixels is None:
out_pixels = seg_slice
else:
out_pixels = np.concatenate((out_pixels, seg_slice), axis=0)
out_frame_counter = out_frame_counter + 1
dcm_output.add_new(0x52009230, "SQ", out_frames) # PerFrameFunctionalGroupsSequence
dcm_output.NumberOfFrames = out_frame_counter
dcm_output.PixelData = np.packbits(np.flip(np.reshape(out_pixels.astype(np.bool), (-1, 8)), 1)).tostring()
dcm_output.get(0x00081115)[0].add_new(0x0008114A, "SQ", referenceInstances) # ReferencedInstanceSequence
# Create shared Functional Groups sequence
sharedFunctionalGroups = Sequence()
sharedFunctionalGroupsDS = Dataset()
planeOrientationSeq = Sequence()
planeOrientationDS = Dataset()
planeOrientationDS.add_new("0x00200037", "DS", safe_get(input_ds[0], 0x00200037)) # ImageOrientationPatient
planeOrientationSeq.append(planeOrientationDS)
sharedFunctionalGroupsDS.add_new("0x00209116", "SQ", planeOrientationSeq) # PlaneOrientationSequence
pixelMeasuresSequence = Sequence()
pixelMeasuresDS = Dataset()
pixelMeasuresDS.add_new("0x00280030", "DS", safe_get(input_ds[0], "0x00280030")) # PixelSpacing
if input_ds[0].get("SpacingBetweenSlices", ""):
pixelMeasuresDS.add_new("0x00180088", "DS", input_ds[0].get("SpacingBetweenSlices", "")) # SpacingBetweenSlices
pixelMeasuresDS.add_new("0x00180050", "DS", safe_get(input_ds[0], "0x00180050")) # SliceThickness
pixelMeasuresSequence.append(pixelMeasuresDS)
sharedFunctionalGroupsDS.add_new("0x00289110", "SQ", pixelMeasuresSequence) # PixelMeasuresSequence
sharedFunctionalGroups.append(sharedFunctionalGroupsDS)
dcm_output.add_new(0x52009229, "SQ", sharedFunctionalGroups) # SharedFunctionalGroupsSequence
# End DICOM Seg Writer temp
def test():
data_path = "/home/mqin/src/monai-deploy-app-sdk/examples/apps/ai_spleen_seg_app/input"
out_path = "/home/mqin/src/monai-deploy-app-sdk/examples/apps/ai_spleen_seg_app/output/dcm_seg_test.dcm"
files = []
loader = DICOMDataLoaderOperator()
loader._list_files(data_path, files)
study_list = loader._load_data(files)
series = study_list[0].get_all_series()[0]
dcm_to_volume_op = DICOMSeriesToVolumeOperator()
dcm_to_volume_op.prepare_series(series)
voxels = dcm_to_volume_op.generate_voxel_data(series)
metadata = dcm_to_volume_op.create_metadata(series)
image = dcm_to_volume_op.create_volumetric_image(voxels, metadata)
image_numpy = image.asnumpy()
seg_writer = DICOMSegmentationWriterOperator()
seg_writer.create_dicom_seg(image_numpy, series, out_path)
if __name__ == "__main__":
test()