diff --git a/pyannote/core/annotation.py b/pyannote/core/annotation.py index 1a934b7..f729e45 100755 --- a/pyannote/core/annotation.py +++ b/pyannote/core/annotation.py @@ -134,6 +134,7 @@ PYANNOTE_TRACK, PYANNOTE_LABEL, ) +from .base import BaseSegmentation, GappedAnnotationMixin from .json import PYANNOTE_JSON, PYANNOTE_JSON_CONTENT from .segment import Segment, SlidingWindow from .timeline import Timeline @@ -145,7 +146,7 @@ import pandas as pd -class Annotation: +class Annotation(GappedAnnotationMixin, BaseSegmentation): """Annotation Parameters @@ -175,7 +176,7 @@ def from_df( def __init__(self, uri: Optional[str] = None, modality: Optional[str] = None): - self._uri: Optional[str] = uri + super().__init__(uri) self.modality: Optional[str] = modality # sorted dictionary @@ -207,7 +208,7 @@ def uri(self, uri: str): timeline.uri = uri self._uri = uri - def _updateLabels(self): + def _update_labels(self): # list of labels that needs to be updated update = set( @@ -293,7 +294,7 @@ def itertracks( else: yield segment, track - def _updateTimeline(self): + def _update_timeline(self): self._timeline = Timeline(segments=self._tracks, uri=self.uri) self._timelineNeedsUpdate = False @@ -319,7 +320,7 @@ def get_timeline(self, copy: bool = True) -> Timeline: """ if self._timelineNeedsUpdate: - self._updateTimeline() + self._update_timeline() if copy: return self._timeline.copy() return self._timeline @@ -332,18 +333,18 @@ def __eq__(self, other: "Annotation"): Two annotations are equal if and only if their tracks and associated labels are equal. """ - pairOfTracks = itertools.zip_longest( + pair_of_tracks = itertools.zip_longest( self.itertracks(yield_label=True), other.itertracks(yield_label=True) ) - return all(t1 == t2 for t1, t2 in pairOfTracks) + return all(t1 == t2 for t1, t2 in pair_of_tracks) def __ne__(self, other: "Annotation"): """Inequality""" - pairOfTracks = itertools.zip_longest( + pair_of_tracks = itertools.zip_longest( self.itertracks(yield_label=True), other.itertracks(yield_label=True) ) - return any(t1 != t2 for t1, t2 in pairOfTracks) + return any(t1 != t2 for t1, t2 in pair_of_tracks) def __contains__(self, included: Union[Segment, Timeline]): """Inclusion @@ -559,6 +560,7 @@ def crop(self, support: Support, mode: CropMode = "intersection") -> "Annotation else: raise NotImplementedError("unsupported mode: '%s'" % mode) + # TODO: remove (already in base class) def extrude( self, removed: Support, mode: CropMode = "intersection" ) -> "Annotation": @@ -634,7 +636,7 @@ def get_overlap(self, labels: Optional[Iterable[Label]] = None) -> "Timeline": annotation.get_overlap() |------| |-----| |--------| - annotation.get_overlap(for_labels=["A", "B"]) + annotation.get_overlap(labels=["A", "B"]) |--| |--| |----| Parameters @@ -913,7 +915,7 @@ def labels(self) -> List[Label]: Sorted list of labels """ if any([lnu for lnu in self._labelNeedsUpdate.values()]): - self._updateLabels() + self._update_labels() return sorted(self._labels, key=str) def get_labels( @@ -1060,7 +1062,7 @@ def label_timeline(self, label: Label, copy: bool = True) -> Timeline: return Timeline(uri=self.uri) if self._labelNeedsUpdate[label]: - self._updateLabels() + self._update_labels() if copy: return self._labels[label].copy() diff --git a/pyannote/core/base.py b/pyannote/core/base.py new file mode 100644 index 0000000..230235d --- /dev/null +++ b/pyannote/core/base.py @@ -0,0 +1,282 @@ +from abc import ABCMeta, abstractmethod +from typing import Optional, Iterator, Tuple, Union, Dict, TYPE_CHECKING, Callable, List, Set, Iterable + +from sortedcontainers import SortedList +from typing_extensions import Self + +from pyannote.core import Segment +from pyannote.core.utils.types import Support, CropMode, ContiguousSupport, Label + +if TYPE_CHECKING: + from .timeline import Timeline + + +class BaseSegmentation(metaclass=ABCMeta): + """Abstract base class for all segmented annotations""" + + def __init__(self, uri: Optional[str] = None): + # path to (or any identifier of) segmented resource + self._uri: Optional[str] = uri + + @property + def uri(self): + return self._uri + + @uri.setter + def uri(self, uri: str): + self._uri = uri + + @abstractmethod + def __len__(self) -> int: + pass + + def __nonzero__(self): + return self.__bool__() + + @abstractmethod + def __bool__(self): + """Truthiness of the segmentation. Truthy means that it contains something + False means it's empty.""" + pass + + @abstractmethod + def __eq__(self, other: Self): + pass + + @abstractmethod + def __ne__(self, other: Self): + pass + + def __matmul__(self, other: Union['BaseSegmentation', Segment]): + return self.co_iter(other) + + @abstractmethod + def itersegments(self): + pass + + def segments_set(self) -> Set[Segment]: + # default implementation, may be overriden for better performance + return set(self.itersegments()) + + def get_timeline(self) -> 'Timeline': + from .timeline import Timeline + return Timeline(self.itersegments()) + + @abstractmethod + def update(self, other: Self) -> Self: + pass + + def co_iter(self, other: Union['BaseSegmentation', Segment]) -> Iterator[Tuple[Segment, Segment]]: + if isinstance(other, Segment): + other_segments = SortedList([other]) + else: + other_segments = SortedList(other.itersegments()) + + # TODO maybe wrap self.itersegs in a sortedlist as well? + for segment in self.itersegments(): + + # iterate over segments that starts before 'segment' ends + temp = Segment(start=segment.end, end=segment.end) + for other_segment in other_segments.irange(maximum=temp): + if segment.intersects(other_segment): + yield segment, other_segment + + @abstractmethod + def __str__(self): + pass + + @abstractmethod + def __repr__(self): + pass + + def __contains__(self, included: Union[Segment, 'BaseSegmentation']) -> bool: + # Base implementation, may be overloaded for better performance + seg_set = self.segments_set() + if isinstance(included, Segment): + return included in seg_set + elif isinstance(included, BaseSegmentation): + return seg_set.issuperset(included.segments_set()) + else: + raise ValueError("") + + @abstractmethod + def empty(self) -> Self: + pass + + @abstractmethod + def copy(self) -> Self: + pass + + @abstractmethod + def extent(self) -> Segment: + pass + + @abstractmethod + def duration(self) -> float: + pass + + @abstractmethod + def _repr_png_(self): + pass + + +# TODO: rename to SegmentSet? +class GappedAnnotationMixin(BaseSegmentation): + + @abstractmethod + def gaps_iter(self, support: Optional[Support] = None) -> Iterator[Segment]: + pass + + @abstractmethod + def gaps(self, support: Optional[Support] = None) -> 'Timeline': + pass + + def extrude(self, + removed: Support, + mode: CropMode = 'intersection') -> Self: + """Remove segments that overlap `removed` support. + + Parameters + ---------- + removed : Segment or Timeline + If `support` is a `Timeline`, its support is used. + mode : {'strict', 'loose', 'intersection'}, optional + Controls how segments that are not fully included in `removed` are + handled. 'strict' mode only removes fully included segments. 'loose' + mode removes any intersecting segment. 'intersection' mode removes + the overlapping part of any intersecting segment. + + Returns + ------- + extruded : Timeline + Extruded timeline + + Examples + -------- + + >>> timeline = Timeline([Segment(0, 2), Segment(1, 2), Segment(3, 5)]) + >>> timeline.extrude(Segment(1, 2)) + , ])> + + >>> timeline.extrude(Segment(1, 3), mode='loose') + ])> + + >>> timeline.extrude(Segment(1, 3), mode='strict') + , ])> + + """ + if isinstance(removed, Segment): + removed = Timeline([removed]) + else: + removed = removed.get_timeline() + + extent_tl = Timeline([self.extent()], uri=self.uri) + truncating_support = removed.gaps(support=extent_tl) + # loose for truncate means strict for crop and vice-versa + if mode == "loose": + mode = "strict" + elif mode == "strict": + mode = "loose" + return self.crop(truncating_support, mode=mode) + + @abstractmethod + def crop(self, + support: Support, + mode: CropMode = 'intersection', + returns_mapping: bool = False) \ + -> Union[Self, Tuple[Self, Dict[Segment, Segment]]]: + pass + + @abstractmethod + def support(self, collar: float = 0.) -> Self: + pass + + @abstractmethod + def get_overlap(self) -> 'Timeline': + pass + + +class ContiguousAnnotationMixin(BaseSegmentation): + # TODO : figure out if the return mapping still makes sense + # (propably not) + + def co_iter(self, other: Union['BaseSegmentation', Segment]) -> Iterator[Tuple[Segment, Segment]]: + if not isinstance(other, (ContiguousAnnotationMixin, Segment)): + return super().co_iter(other) + + # we're dealing with another contiguous segmentation, things can be much quicker + if isinstance(other, Segment): + other_segments = SortedList([other]) + else: + other_segments = SortedList(other.itersegments()) + my_segments = SortedList(self.itersegments()) + try: + seg_a: Segment = my_segments.pop(0) + seg_b: Segment = other_segments.pop(0) + while True: + if seg_a.intersects(seg_b): + yield seg_a, seg_b + if seg_b.end < seg_a.end: + seg_b = other_segments.pop(0) + else: + seg_a = other_segments.pop(0) + except IndexError: # exhausting any of the stacks: yielding nothing and ending + yield from () + + @abstractmethod + def crop(self, + support: ContiguousSupport, + mode: CropMode = 'intersection', + returns_mapping: bool = False) \ + -> Union[Self, Tuple[Self, Dict[Segment, Segment]]]: + # TODO: add errors messages explaining why the support isn't of the right type + pass + + @abstractmethod + def bisect(self, at: float): + pass + + +class PureSegmentationMixin(metaclass=ABCMeta): + """A segmentation containing _only_ segments""" + + # TODO: add __and__ (defaults to crop intersection, not in place), that only takes objects of Self type? + + # TODO: can actually take any BaseSegmentation for add & remove + + @abstractmethod + def crop_iter(self, + support: Support, + mode: CropMode = 'intersection', + returns_mapping: bool = False) \ + -> Iterator[Union[Tuple[Segment, Segment], Segment]]: + pass + + @abstractmethod + def add(self, segment: Segment): + pass + + @abstractmethod + def remove(self, segment: Segment): + pass + + # TODO: maybe could be in BaseSegmentation + @abstractmethod + def index(self, segment: Segment) -> int: + pass + + # TODO: maybe could be in BaseSegmentation + @abstractmethod + def overlapping(self, t: float) -> List[Segment]: + pass + + @abstractmethod + def __iter__(self) -> Iterable[Segment]: + pass + + +class AnnotatedSegmentationMixin(metaclass=ABCMeta): + + @abstractmethod + def __iter__(self) -> Iterable[Tuple[Segment, Label]]: + pass diff --git a/pyannote/core/partition.py b/pyannote/core/partition.py new file mode 100755 index 0000000..37d0a49 --- /dev/null +++ b/pyannote/core/partition.py @@ -0,0 +1,309 @@ +#!/usr/bin/env python +# encoding: utf-8 + +# The MIT License (MIT) + +# Copyright (c) 2014-2020 CNRS + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +# AUTHORS +# Hervé BREDIN - http://herve.niderb.fr +# Grant JENKS - http://www.grantjenks.com/ +# Paul LERNER + +""" +######## +Timeline +######## + +.. plot:: pyplots/timeline.py + +:class:`pyannote.core.Timeline` instances are ordered sets of non-empty +segments: + + - ordered, because segments are sorted by start time (and end time in case of tie) + - set, because one cannot add twice the same segment + - non-empty, because one cannot add empty segments (*i.e.* start >= end) + +There are two ways to define the timeline depicted above: + +.. code-block:: ipython + + In [25]: from pyannote.core import Timeline, Segment + + In [26]: timeline = Timeline() + ....: timeline.add(Segment(1, 5)) + ....: timeline.add(Segment(6, 8)) + ....: timeline.add(Segment(12, 18)) + ....: timeline.add(Segment(7, 20)) + ....: + + In [27]: segments = [Segment(1, 5), Segment(6, 8), Segment(12, 18), Segment(7, 20)] + ....: timeline = Timeline(segments=segments, uri='my_audio_file') # faster + ....: + + In [9]: for segment in timeline: + ...: print(segment) + ...: + [ 00:00:01.000 --> 00:00:05.000] + [ 00:00:06.000 --> 00:00:08.000] + [ 00:00:07.000 --> 00:00:20.000] + [ 00:00:12.000 --> 00:00:18.000] + + +.. note:: + + The optional *uri* keyword argument can be used to remember which document it describes. + +Several convenient methods are available. Here are a few examples: + +.. code-block:: ipython + + In [3]: timeline.extent() # extent + Out[3]: + + In [5]: timeline.support() # support + Out[5]: , ])> + + In [6]: timeline.duration() # support duration + Out[6]: 18 + + +See :class:`pyannote.core.Timeline` for the complete reference. +""" +from typing import (Optional, Iterable, Union, Callable, + Tuple, TYPE_CHECKING, Iterator, Dict, List) + +from sortedcontainers import SortedDict, SortedList, SortedSet +from typing_extensions import Self + +from . import Timeline +from .base import BaseSegmentation, PureSegmentationMixin, ContiguousAnnotationMixin +from .segment import Segment +from .utils.types import Support, CropMode, ContiguousSupport + +if TYPE_CHECKING: + pass + + +# ===================================================================== +# Partition class +# ===================================================================== + + +def pairwise(iterable): + "s -> (s0, s1), (s2, s3), (s4, s5), ..." + a = iter(iterable) + return zip(a, a) + + +class Partition(PureSegmentationMixin, ContiguousAnnotationMixin, BaseSegmentation): + """ + Ordered set of segments that are all contiguous. + + It has a start and a end boundary, and its segments form a continuum + between those two boundaries. Segments can be created by bisecting + the partition at certain points, and removing a segment amounts to + removing the bisections. + + Parameters + ---------- + segments : Segment iterator, optional + initial set of (non-empty) segments + boundaries: Segment, optional + uri : string, optional + name of segmented resource + + Returns + ------- + timeline : Timeline + New timeline + """ + + # TODO: need to reimplement the co_iter function to make it much faster (the whole point of having partitions) + # -> if co_iter with another partition, even faster (need to distinguish the case) + + def __init__(self, + segments: Optional[Iterable[Segment]] = None, + boundaries: Optional[Segment] = None, + uri: str = None): + segments = list(segments) if segments else [] + if not segments and boundaries is None: + raise ValueError("Cannot initialize an empty Partition without definin boundaries") + super().__init__(uri) + self.boundaries = boundaries + timeline = Timeline(segments) + if timeline.extent() not in self.boundaries: + raise ValueError(f"Segments have to be within {boundaries}") + + # automatically filling in the gaps in the segments + timeline.add(self.boundaries) + self._segments_bounds_set = SortedSet() + for (start, end) in timeline: + self._segments_bounds_set.update(start, end) + + def __len__(self) -> int: + return len(self._segments_bounds_set) - 1 + + def __nonzero__(self): + return True + + def __bool__(self): + return True + + def __eq__(self, other: 'Partition'): + return isinstance(other, Partition) and self._segments_set == other._segments_set + + def __ne__(self, other: 'Partition'): + return not other == self + + def __iter__(self) -> Iterable[Segment]: + return self.itersegments() + + def index(self, segment: Segment) -> int: + return self._segments_bounds_set.index(segment.start) + + def bisect(self, at: float): + if not self.boundaries.overlaps(at): + raise ValueError("Cannot bisect outside of partition boundaries") + + self._segments_bounds_set.add(at) + + def fuse(self, at: float): + if at in set(self.boundaries): + raise RuntimeError("Cannot fuse start or end boundary of the partition") + try: + self._segments_bounds_set.remove(at) + except KeyError: + raise RuntimeError("Cannot fuse non-existing boundary") + + def add(self, segment: Segment): + self.bisect(segment.start) + self.bisect(segment.end) + + def remove(self, segment: Segment): + if not (set(segment) & self._segments_bounds_set): + raise KeyError(f"Segment {segment} not in partition") + self._segments_bounds_set.difference_update(segment) + + def itersegments(self): + for (start, end) in pairwise(self._segments_bounds_set): + yield Segment(start, end) + + def get_timeline(self) -> 'Timeline': + return Timeline(self.itersegments(), uri=self.uri) + + def update(self, other: 'Partition') -> 'Partition': + assert other.boundaries in self.boundaries + self._segments_bounds_set |= other._segments_bounds_set + + def __str__(self): + pass + + def __repr__(self): + pass + + def empty(self) -> Self: + return Partition(None, + boundaries=self.boundaries, + uri=self.uri) + + def copy(self) -> Self: + return Partition(self.itersegments(), + boundaries=self.boundaries, + uri=self.uri) + + def extent(self) -> Segment: + return self.boundaries + + def overlapping(self, t: float) -> List[Segment]: + assert self.boundaries.overlaps(t) + end = next(self._segments_bounds_set.irange(mininum=t)) + end_idx = self._segments_bounds_set.index(end) + start = self._segments_bounds_set[end_idx - 1] + return [Segment(start, end)] + + def crop_iter(self, support: ContiguousSupport, mode: CropMode = 'intersection', returns_mapping: bool = False) \ + -> Iterator[Union[Tuple[Segment, Segment], Segment]]: + # TODO: check algo when boundaries match + if not isinstance(support, (Segment, ContiguousAnnotationMixin)): + raise ValueError(f"Only contiguous supports are allowed for cropping a {self.__class__.__name__}.") + + if not isinstance(support, Segment): + support = support.extent() + if self.extent() in support: + return self.itersegments() + + cropped_boundaries = SortedSet(self._segments_bounds_set.irange(minimum=support.start, + maximum=support.end, + inclusive=(False, False))) + + # first, yielding the first "cut" segment depending on mode + if support.start > self.extent().start: + idx_start = self._segments_bounds_set.index(cropped_boundaries[0]) + first_seg = Segment(start=self._segments_bounds_set[idx_start - 1], end=self._segments_bounds_set[idx_start]) + if mode == "intersection": + mapped_to = Segment(start=support.start, end=first_seg.end) + if returns_mapping: + yield first_seg, mapped_to + else: + yield mapped_to + elif mode == "loose": + yield first_seg + + # then, yielding "untouched" segments + for (start, end) in pairwise(cropped_boundaries): + seg = Segment(start, end) + if returns_mapping: + yield seg, seg + else: + yield seg + + # finally, yielding the last "cut" segment depending on mode + if support.end < self.extent().end: + idx_end = self._segments_bounds_set.index(cropped_boundaries[0]) + last_seg = Segment(start=self._segments_bounds_set[idx_end], end=self._segments_bounds_set[idx_end + 1]) + if mode == "intersection": + mapped_to = Segment(start=last_seg.start, end=support.end) + if returns_mapping: + yield last_seg, mapped_to + else: + yield last_seg + elif mode == "loose": + yield last_seg + + def crop(self, support: ContiguousSupport, mode: CropMode = 'intersection', returns_mapping: bool = False) \ + -> Union[Self, Tuple[Self, Dict[Segment, Segment]]]: + if mode == 'intersection' and returns_mapping: + segments, mapping = [], {} + for segment, mapped_to in self.crop_iter(support, + mode='intersection', + returns_mapping=True): + segments.append(segment) + mapping[mapped_to] = [segment] + return Partition(segments=segments, uri=self.uri), mapping + + return Partition(segments=self.crop_iter(support, mode=mode), uri=self.uri) + + def duration(self) -> float: + return self.extent().duration + + def _repr_png_(self): + pass diff --git a/pyannote/core/segment.py b/pyannote/core/segment.py index d659ebb..72aad6f 100755 --- a/pyannote/core/segment.py +++ b/pyannote/core/segment.py @@ -170,6 +170,7 @@ def __bool__(self): def __post_init__(self): """Round start and end up to SEGMENT_PRECISION precision (when required)""" + # TODO: check for start < end ? if AUTO_ROUND_TIME: object.__setattr__(self, 'start', int(self.start / SEGMENT_PRECISION + 0.5) * SEGMENT_PRECISION) object.__setattr__(self, 'end', int(self.end / SEGMENT_PRECISION + 0.5) * SEGMENT_PRECISION) @@ -217,7 +218,7 @@ def __contains__(self, other: 'Segment'): """ return (self.start <= other.start) and (self.end >= other.end) - def __and__(self, other): + def __and__(self, other: 'Segment'): """Intersection >>> segment = Segment(0, 10) @@ -239,6 +240,7 @@ def __and__(self, other): end = min(self.end, other.end) return Segment(start=start, end=end) + def intersects(self, other: 'Segment') -> bool: """Check whether two segments intersect each other @@ -274,6 +276,11 @@ def overlaps(self, t: float) -> bool: """ return self.start <= t and self.end >= t + def bisect(self, at: float) -> Tuple['Segment', 'Segment']: + if not self.overlaps(at): + raise RuntimeError("bisection time isn't contained in the segment") + return Segment(self.start, at), Segment(at, self.end) + def __or__(self, other: 'Segment') -> 'Segment': """Union diff --git a/pyannote/core/tiered.py b/pyannote/core/tiered.py new file mode 100644 index 0000000..de6b19e --- /dev/null +++ b/pyannote/core/tiered.py @@ -0,0 +1,687 @@ +#!/usr/bin/env python +# encoding: utf-8 + +# The MIT License (MIT) + +# Copyright (c) 2014-2020 CNRS + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +# AUTHORS +# Hervé BREDIN - http://herve.niderb.fr +# Paul LERNER + +""" +########## +Annotation +########## + +.. plot:: pyplots/annotation.py + +:class:`pyannote.core.Annotation` instances are ordered sets of non-empty +tracks: + + - ordered, because segments are sorted by start time (and end time in case of tie) + - set, because one cannot add twice the same track + - non-empty, because one cannot add empty track + +A track is a (support, name) pair where `support` is a Segment instance, +and `name` is an additional identifier so that it is possible to add multiple +tracks with the same support. + +To define the annotation depicted above: + +.. code-block:: ipython + + In [1]: from pyannote.core import Annotation, Segment + + In [6]: annotation = Annotation() + ...: annotation[Segment(1, 5)] = 'Carol' + ...: annotation[Segment(6, 8)] = 'Bob' + ...: annotation[Segment(12, 18)] = 'Carol' + ...: annotation[Segment(7, 20)] = 'Alice' + ...: + +which is actually a shortcut for + +.. code-block:: ipython + + In [6]: annotation = Annotation() + ...: annotation[Segment(1, 5), '_'] = 'Carol' + ...: annotation[Segment(6, 8), '_'] = 'Bob' + ...: annotation[Segment(12, 18), '_'] = 'Carol' + ...: annotation[Segment(7, 20), '_'] = 'Alice' + ...: + +where all tracks share the same (default) name ``'_'``. + +In case two tracks share the same support, use a different track name: + +.. code-block:: ipython + + In [6]: annotation = Annotation(uri='my_video_file', modality='speaker') + ...: annotation[Segment(1, 5), 1] = 'Carol' # track name = 1 + ...: annotation[Segment(1, 5), 2] = 'Bob' # track name = 2 + ...: annotation[Segment(12, 18)] = 'Carol' + ...: + +The track name does not have to be unique over the whole set of tracks. + +.. note:: + + The optional *uri* and *modality* keywords argument can be used to remember + which document and modality (e.g. speaker or face) it describes. + +Several convenient methods are available. Here are a few examples: + +.. code-block:: ipython + + In [9]: annotation.labels() # sorted list of labels + Out[9]: ['Bob', 'Carol'] + + In [10]: annotation.chart() # label duration chart + Out[10]: [('Carol', 10), ('Bob', 4)] + + In [11]: list(annotation.itertracks()) + Out[11]: [(, 1), (, 2), (, u'_')] + + In [12]: annotation.label_timeline('Carol') + Out[12]: , ])> + +See :class:`pyannote.core.Annotation` for the complete reference. +""" +from abc import abstractmethod +from pathlib import Path +from typing import Optional, Dict, Union, Iterable, List, TextIO, Tuple, Iterator, Callable, Type, Generic + +from sortedcontainers import SortedDict +from typing_extensions import Self, Any + +from pyannote.core import Annotation +from .base import BaseSegmentation, GappedAnnotationMixin, ContiguousAnnotationMixin, AnnotatedSegmentationMixin +from .partition import Partition +from .segment import Segment +from .timeline import Timeline +from .utils.generators import string_generator +from .utils.types import Label, Key, Support, TierName, CropMode, ContiguousSupport, TierItemPair + +# TODO: QUESTIONS: +# - iterator for the TieredAnnotation + +T = Type[Union[Partition, Timeline]] + + +class BaseTier(BaseSegmentation, AnnotatedSegmentationMixin, Generic[T]): + _segmentation: Union[Partition, Timeline] + + def __init__(self, name: str = None, uri: str = None): + super().__init__(uri) + self.name = name + self._segments: Dict[Segment, TierLabel] = dict() + + @abstractmethod + def __setitem__(self, segment: Segment, label: Any): + pass + + def __getitem__(self, key: Union[Segment, int]) -> Any: + # TODO: check int key + if isinstance(key, int): + key = self._segmentation.__getitem__(key) + return self._segments[key] + + def __delitem__(self, key: Union[Segment, int]): + # TODO: check int key + if isinstance(key, int): + key = self._segmentation.__getitem__(key) + + del self._segments[key] + self._segmentation.remove(key) + + def __iter__(self) -> Iterable[TierItemPair]: + """Return segments with their annotation, in chronological order""" + for segment in self._segmentation.itersegments(): + yield segment, self._segments.get(segment, None) + + def __len__(self): + """Number of segments in the tier + + >>> len(tier) # tier contains three segments + 3 + """ + return len(self._segments) + + def __bool__(self): + """Emptiness + + >>> if tier: + ... # tier is empty + ... else: + ... # tier is not empty + """ + return bool(self._segments) + + def __eq__(self, other: 'BaseTier'): + """Equality + + Two Tiers are equal if and only if their segments and their annotations are equal. + + # TODO : doc + >>> timeline1 = Timeline([Segment(0, 1), Segment(2, 3)]) + >>> timeline2 = Timeline([Segment(2, 3), Segment(0, 1)]) + >>> timeline3 = Timeline([Segment(2, 3)]) + >>> timeline1 == timeline2 + True + >>> timeline1 == timeline3 + False + """ + return self._segments == other._segments + + def __ne__(self, other: 'BaseTier'): + """Inequality""" + return self._segments != other._segments + + def itersegments(self): + return self._segmentation.itersegments() + + def empty(self) -> 'BaseTier': + """Return an empty copy + + Returns + ------- + empty : Tier + Empty timeline using the same 'uri' attribute. + + """ + return self.__class__(self.name, uri=self.uri) + + def copy(self, segment_func: Optional[Callable[[Segment], Segment]] = None) -> Self: + copy = self.__class__(name=self.name, uri=self.uri) + copy._segmentation = self._segmentation.copy() + copy._segments = self._segments.copy() + return copy + + def extent(self) -> Segment: + return self._segmentation.extent() + + def duration(self) -> float: + return self._segmentation.duration() + + def crop(self, + support: ContiguousSupport, + mode: CropMode = 'intersection', + returns_mapping: bool = False) \ + -> Union[Self, Tuple[Self, Dict[Segment, Segment]]]: + if mode in {"loose", "strict"}: + cropped_seg = self._segmentation.crop(support, mode=mode) + annotated_segments = {seg: self._segments[seg] for seg in cropped_seg} + else: # it's "intersection" + cropped_seg, mapping = self._segmentation.crop(support, mode="intersection", returns_mapping=True) + annotated_segments = {} + for seg, mapped_to in mapping.items(): + annotated_segments.update({ + seg: self._segments[mapped_seg] for mapped_seg in mapped_to + }) + + new_tier = self.__class__(self.name, uri=self.uri) + new_tier._segmentation = cropped_seg + new_tier._segments = annotated_segments + if returns_mapping and mode == "intersection": + return new_tier, mapping # noqa + else: + return new_tier + + def _repr_png_(self): + pass + + +class Tier(GappedAnnotationMixin, BaseTier[Timeline]): + """A set of chronologically-ordered and non-overlapping annotated segments""" + + # TODO: crop-safe or non-overlapping? + + def __init__(self, name: str = None, uri: str = None): + super().__init__(name, uri) + self._segmentation = Timeline() + + def __setitem__(self, segment: Segment, label: Any): + if list(self._segmentation.co_iter(segment)): + raise RuntimeError("Cannot add a segment that overlaps a pre-existing segment") + self._segmentation.add(segment) + self._segments[segment] = label + + def gaps_iter(self, support: Optional[Support] = None) -> Iterator[Segment]: + return self._segmentation.gaps_iter(support) + + def gaps(self, support: Optional[Support] = None) -> 'Timeline': + return self._segmentation.gaps(support) + + def support(self, collar: float = 0.) -> Timeline: + return self._segmentation.support(collar) + + def get_overlap(self) -> 'Timeline': + return self._segmentation.get_overlap() + + def co_iter(self, other: Union[Timeline, Segment]) -> Iterator[Tuple[Segment, Segment]]: + yield from self._segmentation.co_iter(other) + + def update(self, tier: 'Tier') -> 'Tier': + self._segmentation.update(tier._segmentation) + self._segments.update(tier._segments) + + +class PartitionTier(ContiguousAnnotationMixin, BaseTier[Partition]): + """A set of chronologically-ordered, contiguous and non-overlapping annotated segments""" + + def __init__(self, boundaries: Segment, name: str = None, uri: str = None): + super().__init__(name, uri) + self._segmentation = Partition(boundaries=boundaries) + + def __setitem__(self, segment: Segment, label: Any): + # TODO: maybe allow segment setting for segments that are not yet annotated? + if not segment in self._segmentation: + raise RuntimeError(f"Segment {segment} not contained in the tier's partition") + self._segments[segment] = label + + def __str__(self): + pass + + def __repr__(self): + pass + + def bisect(self, at: float): + self._segmentation.bisect(at) + bisected_segment = self._segmentation.overlapping(at)[0] + annot = self._segments[bisected_segment] + del self._segments[bisected_segment] + self._segments.update({seg: annot for seg in bisected_segment.bisect(at)}) + + def update(self, tier: 'BaseTier') -> 'BaseTier': + raise RuntimeError(f"A {self.__class__.__name__} cannot be updated.") + + +class TieredAnnotation(GappedAnnotationMixin, BaseSegmentation): + + def __init__(self, uri: Optional[str] = None): + super().__init__(uri) + + self._uri: Optional[str] = uri + + # sorted dictionary + # values: {tiername: tier} dictionary + self._tiers: Dict[TierName, Tier] = SortedDict() + + # timeline meant to store all annotated segments + self._timeline: Timeline = None + self._timelineNeedsUpdate: bool = True + + @classmethod + def from_textgrid(cls, textgrid: Union[str, Path, TextIO], + textgrid_format: str = "full"): + try: + from textgrid_parser import parse_textgrid + except ImportError: + raise ImportError("The dependencies used to parse TextGrid file cannot be found. " + "Please install using pyannote.core[textgrid]") + # TODO : check for tiers with duplicate names + + return parse_textgrid(textgrid, textgrid_format=textgrid_format) + + @property + def uri(self): + return self._uri + + @uri.setter + def uri(self, uri: str): + # update uri for all internal timelines + timeline = self.get_timeline(copy=False) + timeline.uri = uri + self._uri = uri + + @property + def tiers(self) -> List[Tier]: + return list(self._tiers.values()) + + @property + def tiers_names(self) -> List[TierName]: + return list(self._tiers.keys()) + + @property + def tiers_count(self): + return len(self._tiers) + + def __len__(self): + """Number of segments + + >>> len(textgrid) # textgrid contains 10 segments + 10 + """ + return sum(len(tier) for tier in self._tiers.values()) + + def __nonzero__(self): + return self.__bool__() + + def __bool__(self): + """Emptiness + # TODO : docfix + >>> if annotation: + ... # annotation is empty + ... else: + ... # annotation is not empty + """ + return len(self) > 0 + + def __iter__(self) -> Iterable[Tuple[Segment, str]]: + # TODO + pass + + def __eq__(self, other: 'TieredAnnotation'): + """Equality + + >>> annotation == other + + Two annotations are equal if and only if their tracks and associated + labels are equal. + """ + # TODO + pass + + def __ne__(self, other: 'TieredAnnotation'): + """Inequality""" + # TODO + pass + + def __contains__(self, included: Union[Segment, Timeline]): + """Inclusion + + Check whether every segment of `included` does exist in annotation. + + Parameters + ---------- + included : Segment or Timeline + Segment or timeline being checked for inclusion + + Returns + ------- + contains : bool + True if every segment in `included` exists in timeline, + False otherwise + + """ + return included in self.get_timeline(copy=False) + + def __delitem__(self, key: TierName): + """Delete a tier + # TODO : doc + """ + del self._tiers[key] + + def __getitem__(self, key: TierName) -> Tier: + """Get a tier + + >>> praat_tier = annotation[tiername] + + """ + + return self._tiers[key] + + def __setitem__(self, key: Key, label: Label): + pass # TODO : set a tier + + def __repr__(self): + pass # TODO + + def __str__(self): + """Human-friendly representation""" + # TODO: use pandas.DataFrame + return "\n".join(["%s %s %s" % (s, t, l) + for s, t, l in self.itertracks(yield_label=True)]) + + def empty(self) -> 'Annotation': + """Return an empty copy + + Returns + ------- + empty : Annotation + Empty annotation using the same 'uri' and 'modality' attributes. + + """ + return self.__class__(uri=self.uri) + + def itersegments(self): + """Iterate over segments (in chronological order) + + >>> for segment in annotation.itersegments(): + ... # do something with the segment + + See also + -------- + :class:`pyannote.core.Segment` describes how segments are sorted. + """ + for tier in self.tiers: + yield from tier + + def to_textgrid(self, file: Union[str, Path, TextIO]): + pass + + def to_annotation(self, modality: Optional[str] = None) -> Annotation: + """Convert to an annotation object. The new annotation's labels + are the tier names of each segments. In short, the segment's + # TODO : visual example + + Parameters + ---------- + modality: optional str + + Returns + ------- + annotation : Annotation + A new Annotation Object + + Note + ---- + If you want to convert part of a `PraatTextGrid` to an `Annotation` object + while keeping the segment's labels, you can use the tier's + :func:`~pyannote.textgrid.PraatTier.to_annotation` + """ + annotation = Annotation(uri=self.uri, modality=modality) + for tier_name, tier in self._tiers.items(): + for segment, _ in tier: + annotation[segment] = tier_name + return annotation + + def get_timeline(self, copy: bool = True) -> Timeline: + """Get timeline made of all annotated segments + + Parameters + ---------- + copy : bool, optional + Defaults (True) to returning a copy of the internal timeline. + Set to False to return the actual internal timeline (faster). + + Returns + ------- + timeline : Timeline + Timeline made of all annotated segments. + + Note + ---- + In case copy is set to False, be careful **not** to modify the returned + timeline, as it may lead to weird subsequent behavior of the annotation + instance. + + """ + if self._timelineNeedsUpdate: + self._update_timeline() + if copy: + return self._timeline.copy() + return self._timeline + + def crop(self, support: Support, mode: CropMode = 'intersection') \ + -> 'TieredAnnotation': + """Crop textgrid to new support + + Parameters + ---------- + support : Segment or Timeline + If `support` is a `Timeline`, its support is used. + mode : {'strict', 'loose', 'intersection'}, optional + Controls how segments that are not fully included in `support` are + handled. 'strict' mode only keeps fully included segments. 'loose' + mode keeps any intersecting segment. 'intersection' mode keeps any + intersecting segment but replace them by their actual intersection. + + Returns + ------- + cropped : TieredAnnotation + Cropped textgrid + """ + new_tg = TieredAnnotation(self.uri) + for tier_name, tier in self._tiers.items(): + new_tg._tiers[tier_name] = tier.crop(support) + return new_tg + + def copy(self) -> 'TieredAnnotation': + """Get a copy of the annotation + + Returns + ------- + annotation : TieredAnnotation + Copy of the textgrid + """ + + # create new empty annotation + # TODO + pass + + def update(self, textgrid: 'TieredAnnotation', copy: bool = False) \ + -> 'TieredAnnotation': + """Add every track of an existing annotation (in place) + + Parameters + ---------- + annotation : Annotation + Annotation whose tracks are being added + copy : bool, optional + Return a copy of the annotation. Defaults to updating the + annotation in-place. + + Returns + ------- + self : Annotation + Updated annotation + + Note + ---- + Existing tracks are updated with the new label. + """ + + result = self.copy() if copy else self + + # TODO + + return result + + def support(self, collar: float = 0.) -> 'TieredAnnotation': + # TODO + """Annotation support + + The support of an annotation is an annotation where contiguous tracks + with same label are merged into one unique covering track. + + A picture is worth a thousand words:: + + collar + |---| + + annotation + |--A--| |--A--| |-B-| + |-B-| |--C--| |----B-----| + + annotation.support(collar) + |------A------| |------B------| + |-B-| |--C--| + + Parameters + ---------- + collar : float, optional + Merge tracks with same label and separated by less than `collar` + seconds. This is why 'A' tracks are merged in above figure. + Defaults to 0. + + Returns + ------- + support : Annotation + Annotation support + + Note + ---- + Track names are lost in the process. + """ + + generator = string_generator() + + # initialize an empty annotation + # with same uri and modality as original + support = self.empty() + for label in self.labels(): + + # get timeline for current label + timeline = self.label_timeline(label, copy=True) + + # fill the gaps shorter than collar + timeline = timeline.support(collar) + + # reconstruct annotation with merged tracks + for segment in timeline.support(): + support[segment, next(generator)] = label + + return support + + def gaps_iter(self, support: Optional[Support] = None) -> Iterator[Segment]: + pass + + def gaps(self, support: Optional[Support] = None) -> 'Timeline': + pass + + def get_overlap(self) -> 'Timeline': + pass + + def extent(self) -> Segment: + pass + + def crop_iter(self, support: Support, mode: CropMode = 'intersection', returns_mapping: bool = False) -> Iterator[ + Union[Tuple[Segment, Segment], Segment]]: + pass + + def duration(self) -> float: + pass + + def _repr_png_(self): + pass + + def _repr_png(self): + """IPython notebook support + + See also + -------- + :mod:`pyannote.core.notebook` + """ + + from .notebook import repr_annotation + return repr_annotation(self) diff --git a/pyannote/core/timeline.py b/pyannote/core/timeline.py index 57114be..8aeb093 100755 --- a/pyannote/core/timeline.py +++ b/pyannote/core/timeline.py @@ -93,13 +93,14 @@ TextIO, Tuple, TYPE_CHECKING, Iterator, Dict, Text) from sortedcontainers import SortedList +from typing_extensions import Self from . import PYANNOTE_URI, PYANNOTE_SEGMENT +from .base import GappedAnnotationMixin, BaseSegmentation from .json import PYANNOTE_JSON, PYANNOTE_JSON_CONTENT from .segment import Segment from .utils.types import Support, Label, CropMode - # this is a moderately ugly way to import `Annotation` to the namespace # without causing some circular imports : # https://stackoverflow.com/questions/39740632/python-type-hinting-without-cyclic-imports @@ -113,7 +114,7 @@ # ===================================================================== -class Timeline: +class Timeline(GappedAnnotationMixin, BaseSegmentation): """ Ordered set of segments. @@ -143,6 +144,7 @@ def from_df(cls, df: 'pd.DataFrame', uri: Optional[str] = None) -> 'Timeline': def __init__(self, segments: Optional[Iterable[Segment]] = None, uri: str = None): + super().__init__(uri) if segments is None: segments = () @@ -155,12 +157,6 @@ def __init__(self, # sorted list of segments (used for sorted iteration) self.segments_list_ = SortedList(segments_set) - # sorted list of (possibly redundant) segment boundaries - boundaries = (boundary for segment in segments_set for boundary in segment) - self.segments_boundaries_ = SortedList(boundaries) - - # path to (or any identifier of) segmented resource - self.uri: str = uri def __len__(self): """Number of segments @@ -203,7 +199,7 @@ def __getitem__(self, k: int) -> Segment: """ return self.segments_list_[k] - def __eq__(self, other: 'Timeline'): + def __eq__(self, other: Self): """Equality Two timelines are equal if and only if their segments are equal. @@ -218,7 +214,7 @@ def __eq__(self, other: 'Timeline'): """ return self.segments_set_ == other.segments_set_ - def __ne__(self, other: 'Timeline'): + def __ne__(self, other: Self): """Inequality""" return self.segments_set_ != other.segments_set_ @@ -270,11 +266,6 @@ def add(self, segment: Segment) -> 'Timeline': segments_set_.add(segment) self.segments_list_.add(segment) - - segments_boundaries_ = self.segments_boundaries_ - segments_boundaries_.add(segment.start) - segments_boundaries_.add(segment.end) - return self def remove(self, segment: Segment) -> 'Timeline': @@ -303,10 +294,6 @@ def remove(self, segment: Segment) -> 'Timeline': self.segments_list_.remove(segment) - segments_boundaries_ = self.segments_boundaries_ - segments_boundaries_.remove(segment.start) - segments_boundaries_.remove(segment.end) - return self def discard(self, segment: Segment) -> 'Timeline': @@ -321,7 +308,7 @@ def discard(self, segment: Segment) -> 'Timeline': def __ior__(self, timeline: 'Timeline') -> 'Timeline': return self.update(timeline) - def update(self, timeline: Segment) -> 'Timeline': + def update(self, timeline: 'Timeline') -> 'Timeline': """Add every segments of an existing timeline (in place) Parameters @@ -348,16 +335,12 @@ def update(self, timeline: Segment) -> 'Timeline': # sorted list of segments (used for sorted iteration) self.segments_list_ = SortedList(segments_set) - # sorted list of (possibly redundant) segment boundaries - boundaries = (boundary for segment in segments_set for boundary in segment) - self.segments_boundaries_ = SortedList(boundaries) - return self - def __or__(self, timeline: 'Timeline') -> 'Timeline': + def __or__(self, timeline: Union['Timeline', Segment]) -> 'Timeline': return self.union(timeline) - def union(self, timeline: 'Timeline') -> 'Timeline': + def union(self, timeline: Union['Timeline', Segment]) -> 'Timeline': """Create new timeline made of union of segments Parameters @@ -375,10 +358,13 @@ def union(self, timeline: 'Timeline') -> 'Timeline': This does the same as timeline.update(...) except it returns a new timeline, and the original one is not modified. """ + if isinstance(timeline, Segment): + timeline = Timeline([timeline]) + segments = self.segments_set_ | timeline.segments_set_ return Timeline(segments=segments, uri=self.uri) - def co_iter(self, other: 'Timeline') -> Iterator[Tuple[Segment, Segment]]: + def co_iter(self, other: Union['Timeline', Segment]) -> Iterator[Tuple[Segment, Segment]]: """Iterate over pairs of intersecting segments >>> timeline1 = Timeline([Segment(0, 2), Segment(1, 2), Segment(3, 4)]) @@ -399,6 +385,8 @@ def co_iter(self, other: 'Timeline') -> Iterator[Tuple[Segment, Segment]]: iterable : (Segment, Segment) iterable Yields pairs of intersecting segments in chronological order. """ + if isinstance(other, Segment): + other = Timeline([other]) for segment in self.segments_list_: @@ -424,6 +412,7 @@ def crop_iter(self, raise ValueError("Mode must be one of 'loose', 'strict', or " "'intersection'.") + # TODO: make more generic using SegmentSet if not isinstance(support, (Segment, Timeline)): raise TypeError("Support must be a Segment or a Timeline.") @@ -579,6 +568,7 @@ def get_overlap(self) -> 'Timeline': overlaps_tl.add(s1 & s2) return overlaps_tl.support() + # TODO: remove (already in base class) def extrude(self, removed: Support, mode: CropMode = 'intersection') -> 'Timeline': @@ -790,9 +780,8 @@ def extent(self) -> Segment: """ if self.segments_set_: - segments_boundaries_ = self.segments_boundaries_ - start = segments_boundaries_[0] - end = segments_boundaries_[-1] + start = self.segments_list_[0].start + end = self.segments_list_[-1].end return Segment(start=start, end=end) return Segment(start=0.0, end=0.0) diff --git a/pyannote/core/utils/loaders.py b/pyannote/core/utils/loaders.py new file mode 100644 index 0000000..7abdb35 --- /dev/null +++ b/pyannote/core/utils/loaders.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python +# encoding: utf-8 + +# The MIT License (MIT) + +# Copyright (c) 2016-2020 CNRS + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +# AUTHORS +# Hervé BREDIN - http://herve.niderb.fr + +from typing import Text + +import pandas as pd + +from pyannote.core import Segment, Timeline, Annotation + +DatabaseName = Text +PathTemplate = Text + + +def load_rttm(file_rttm): + """Load RTTM file + + Parameter + --------- + file_rttm : `str` + Path to RTTM file. + + Returns + ------- + annotations : `dict` + Speaker diarization as a {uri: pyannote.core.Annotation} dictionary. + """ + + names = [ + "NA1", + "uri", + "NA2", + "start", + "duration", + "NA3", + "NA4", + "speaker", + "NA5", + "NA6", + ] + dtype = {"uri": str, "start": float, "duration": float, "speaker": str} + data = pd.read_csv( + file_rttm, + names=names, + dtype=dtype, + delim_whitespace=True, + keep_default_na=False, + ) + + annotations = dict() + for uri, turns in data.groupby("uri"): + annotation = Annotation(uri=uri) + for i, turn in turns.iterrows(): + segment = Segment(turn.start, turn.start + turn.duration) + annotation[segment, i] = turn.speaker + annotations[uri] = annotation + + return annotations + + +def load_mdtm(file_mdtm): + """Load MDTM file + + Parameter + --------- + file_mdtm : `str` + Path to MDTM file. + + Returns + ------- + annotations : `dict` + Speaker diarization as a {uri: pyannote.core.Annotation} dictionary. + """ + + names = ["uri", "NA1", "start", "duration", "NA2", "NA3", "NA4", "speaker"] + dtype = {"uri": str, "start": float, "duration": float, "speaker": str} + data = pd.read_csv( + file_mdtm, + names=names, + dtype=dtype, + delim_whitespace=True, + keep_default_na=False, + ) + + annotations = dict() + for uri, turns in data.groupby("uri"): + annotation = Annotation(uri=uri) + for i, turn in turns.iterrows(): + segment = Segment(turn.start, turn.start + turn.duration) + annotation[segment, i] = turn.speaker + annotations[uri] = annotation + + return annotations + + +def load_uem(file_uem): + """Load UEM file + + Parameter + --------- + file_uem : `str` + Path to UEM file. + + Returns + ------- + timelines : `dict` + Evaluation map as a {uri: pyannote.core.Timeline} dictionary. + """ + + names = ["uri", "NA1", "start", "end"] + dtype = {"uri": str, "start": float, "end": float} + data = pd.read_csv(file_uem, names=names, dtype=dtype, delim_whitespace=True) + + timelines = dict() + for uri, parts in data.groupby("uri"): + segments = [Segment(part.start, part.end) for i, part in parts.iterrows()] + timelines[uri] = Timeline(segments=segments, uri=uri) + + return timelines + + +def load_lab(path, uri: str = None) -> Annotation: + """Load LAB file + + Parameter + --------- + file_lab : `str` + Path to LAB file + + Returns + ------- + data : `pyannote.core.Annotation` + """ + + names = ["start", "end", "label"] + dtype = {"start": float, "end": float, "label": str} + data = pd.read_csv(path, names=names, dtype=dtype, delim_whitespace=True) + + annotation = Annotation(uri=uri) + for i, turn in data.iterrows(): + segment = Segment(turn.start, turn.end) + annotation[segment, i] = turn.label + + return annotation + + +def load_lst(file_lst): + """Load LST file + + LST files provide a list of URIs (one line per URI) + + Parameter + --------- + file_lst : `str` + Path to LST file. + + Returns + ------- + uris : `list` + List or uris + """ + + with open(file_lst, mode="r") as fp: + lines = fp.readlines() + return [line.strip() for line in lines] + + +def load_mapping(mapping_txt): + """Load mapping file + + Parameter + --------- + mapping_txt : `str` + Path to mapping file + + Returns + ------- + mapping : `dict` + {1st field: 2nd field} dictionary + """ + + with open(mapping_txt, mode="r") as fp: + lines = fp.readlines() + + mapping = dict() + for line in lines: + key, value, *left = line.strip().split() + mapping[key] = value + + return mapping diff --git a/pyannote/core/utils/types.py b/pyannote/core/utils/types.py index 0105a16..e10f8f2 100644 --- a/pyannote/core/utils/types.py +++ b/pyannote/core/utils/types.py @@ -3,11 +3,14 @@ from typing_extensions import Literal Label = Hashable -Support = Union['Segment', 'Timeline'] +Support = Union['Segment', 'SegmentSet'] +ContiguousSupport = Union['Segment', 'ContiguousAnnotationMixin'] LabelGeneratorMode = Literal['int', 'string'] LabelGenerator = Union[LabelGeneratorMode, Iterator[Label]] TrackName = Union[str, int] Key = Union['Segment', Tuple['Segment', TrackName]] +TierName = str +TierItemPair = Tuple[TierName, 'Segment'] Resource = Union['Segment', 'Timeline', 'SlidingWindowFeature', 'Annotation'] CropMode = Literal['intersection', 'loose', 'strict'] diff --git a/setup.py b/setup.py index f10a834..e55bbbe 100755 --- a/setup.py +++ b/setup.py @@ -65,6 +65,9 @@ "Topic :: Scientific/Engineering" ], extras_require={ + 'textgrid': [ + "git+ssh://git@github.com/hadware/textgrid-parser.git" + ], 'testing': ['pytest', 'pandas >= 0.17.1', 'flake8==3.7.9'],