Skip to content

Project

Project

Source code in pyflp/project.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
class Project:
    __slots__ = (
        "events",
        "save_path",
        "misc",
        "patterns",
        "filters",
        "channels",
        "arrangements",
        "inserts",
        "controllers",
        "_unparsed_events",
    )

    def __init__(self) -> None:
        self.events: List[EventType] = []
        self.save_path: Optional[Path] = None
        self.misc = Misc()
        self.patterns: List[Pattern] = []
        self.filters: List[Filter] = []
        self.channels: List[Channel] = []
        self.arrangements: List[Arrangement] = []
        self.inserts: List[Insert] = []
        self.controllers: List[Controller] = []
        self._unparsed_events: List[EventType] = []

    def __repr__(self) -> str:
        return "<Project {}, {}, {}, {}, {}, {}, {}>".format(
            f'version="{self.misc.version}"',
            f"{len(self.channels)} channels",
            f"{len(self.patterns)} patterns",
            f"{len(self.used_insert_nums())} inserts used",
            f"{self.slots_used()} insert slots occupied",
            f"{len(self.controllers)} controllers",
            f"{len(self.events)} events ({len(self._unparsed_events)} unparsed)",
        )

    # * Utilities
    def used_insert_nums(self) -> Set[int]:
        """Returns a set of `Insert` indexes to which channels are routed."""
        ret = set()
        for channel in self.channels:
            i = channel.target_insert
            if i is not None:
                ret.add(i)
        return ret

    def slots_used(self) -> int:
        """Returns the total number of slots used across all inserts."""
        ret = int()
        for insert in self.inserts:
            for slot in insert.slots:
                if slot.is_used():
                    ret += 1
        return ret

    def create_zip(self, path: Union[str, Path] = "") -> None:
        """Exports a ZIP looped package of an FLP.
        Importing stock sample Works only on Windows

        Args:
            path: The path to save the ZIP to.

        Raises:
            AttributeError: When path is default and Project was created from a stream
        """

        # Init
        if isinstance(path, str):
            if not path:
                if not hasattr(self, "save_path"):
                    raise AttributeError(
                        "Optional argument 'path' cannot be default to create "
                        "a ZIP for a Project object created through a stream."
                    )
                path = Path(self.save_path)
            path = Path(path)
        p = os.fspath(path.with_suffix(".zip"))

        # Get FL Studio install dir for copying stock samples
        il = Path(os.environ["PROGRAMFILES"]) / "Image-Line"
        asio = il / "FL Studio ASIO"
        shared = il / "Shared"
        fl_dir = Path()
        for dir in il.iterdir():
            if dir in (asio, shared):
                continue
            else:
                fl_dir = dir
                break

        cwd = os.getcwd()
        os.chdir(str(path.parent))
        with zipfile.ZipFile(p, "x") as archive:
            system = platform.system()

            # Add FLP to ZIP
            archive.write(str(path.name))

            # Find sampler and audio channels
            for channel in self.channels:
                sample_path = channel.sample_path

                # Check whether sample file exists
                if sample_path:

                    # Check existence of stock samples and Windows
                    if (
                        sample_path.find(r"%FLStudioFactoryData") != -1
                        and system != "Windows"
                    ):
                        warnings.warn(
                            f"Cannot import stock samples from {system}. "
                            "Only Windows is supported currently.",
                            RuntimeWarning,
                        )

                    # Resolve locations of stock samples
                    if fl_dir.exists():
                        sample_path = sample_path.replace(
                            r"%FLStudioFactoryData%", str(fl_dir), 1
                        )
                    else:
                        warnings.warn(
                            "Importing stock samples requires FL Studio "
                            f"installed at {str(fl_dir)}. Skipping sample"
                        )
                    sp = Path(sample_path)
                    if not sp.exists():
                        warnings.warn(f"File {sample_path} doesn't exist")
                        continue

                    # Add samples to ZIP
                    parent = sp.parent.absolute()
                    os.chdir(str(parent))
                    archive.write(sp.name)
        os.chdir(cwd)

    # * Save logic
    def __save_state(self) -> List[EventType]:
        """Calls `_save` for all `_FLObject`s and returns a sorted list of the received events

        Returns:
            List[Event]: A list of events sorted by `Event.index`
        """
        from pyflp._flobject import _FLObject

        event_store: List[EventType] = []

        # Misc
        misc_events = list(self.misc._save())
        if misc_events:
            event_store.extend(misc_events)

        # Unparsed/unimplemented events
        if self._unparsed_events:
            event_store.extend(self._unparsed_events)

        for param in (
            "filters",
            "channels",
            "patterns",
            "arrangements",
            "inserts",
            "controllers",
        ):
            objs: List[_FLObject] = getattr(self, param)
            for obj in objs:
                event_store.extend(obj._save())

        # Insert params event
        for e in self.events:
            if e.id_ == InsertParamsEvent.ID:
                event_store.append(e)

        # ? Assign event store to self.events
        self.events = event_store

        # Sort the events in ascending order w.r.t index
        event_store.sort(key=lambda event: event.index)
        return event_store

    def get_stream(self) -> bytes:
        """Converts the list of events received from `self._save_state()`
        and headers into a single stream. Typically used directly when
        `Project` was parsed from a stream, i.e. `save_path` is not set.

        Returns:
            bytes: The stream. Used by `save()`.
        """

        # Save event state
        event_store = self.__save_state()

        # Begin the save process: Stream init
        stream = io.BytesIO()

        # Header
        header = (
            b"FLhd"
            + int.to_bytes(6, 4, "little")
            + self.misc.format.to_bytes(2, "little", signed=True)
            + self.misc.channel_count.to_bytes(2, "little")
            + self.misc.ppq.to_bytes(2, "little")
        )
        stream.write(header)

        # Data chunk header
        data = BytesIOEx(b"FLdt")
        data.seek(4)

        # Calculate chunk length
        chunklen = 0
        for ev in event_store:
            chunklen += ev.size
        data.write_I(chunklen)

        # Dump events
        for ev in event_store:
            data.write(ev.to_raw())
        if (data.tell() - 8) != chunklen:
            raise DataCorruptionDetectedError

        # BytesIOEx -> bytes
        data.seek(0)
        stream.write(data.read())
        stream.seek(0)
        return stream.read()

    # TODO Implement saving for ZIP looped packages
    def save(self, save_path: Union[Path, str] = "") -> None:
        """Saves `Project` to disk.

        Args:
            save_path (Union[Path, str], optional): File path to save to.

        Raises:
            AttributeError: When `Project.save_path` \
            doesn't exist and `save_path` is not set.
            OSError: Exception which caused the write failed, \
            most proably a permission/file-in-use error.
        """

        # Type checking and init
        if isinstance(save_path, str):
            save_path = Path(save_path)
        if not (hasattr(self, "save_path") or save_path == "."):
            raise AttributeError(
                "Optional argument 'path' cannot be default when Parser "
                "was initialised from a stream. Use get_stream() instead."
            )
        if hasattr(self, "save_path"):
            if save_path == Path("."):
                save_path = self.save_path
                suffix = save_path.suffix if save_path.suffix else ""
                save_path_bak = save_path.with_suffix(f"{suffix}.bak")
                if save_path_bak.exists():
                    save_path_bak.unlink()
                save_path.rename(save_path_bak)
        try:
            stream = self.get_stream()
        except Exception:
            # Rollback
            save_path_bak.rename(save_path)

        with open(save_path, "wb") as fp:
            try:
                fp.write(stream)
            except OSError as e:
                fp.close()
                save_path.unlink()
                if save_path == self.save_path:
                    save_path_bak.rename(self.save_path)
                raise e

__slots__ = ('events', 'save_path', 'misc', 'patterns', 'filters', 'channels', 'arrangements', 'inserts', 'controllers', '_unparsed_events') class-attribute

arrangements: List[Arrangement] = [] instance-attribute

channels: List[Channel] = [] instance-attribute

controllers: List[Controller] = [] instance-attribute

events: List[EventType] = [] instance-attribute

filters: List[Filter] = [] instance-attribute

inserts: List[Insert] = [] instance-attribute

misc = Misc() instance-attribute

patterns: List[Pattern] = [] instance-attribute

save_path: Optional[Path] = None instance-attribute

__init__()

Source code in pyflp/project.py
52
53
54
55
56
57
58
59
60
61
62
def __init__(self) -> None:
    self.events: List[EventType] = []
    self.save_path: Optional[Path] = None
    self.misc = Misc()
    self.patterns: List[Pattern] = []
    self.filters: List[Filter] = []
    self.channels: List[Channel] = []
    self.arrangements: List[Arrangement] = []
    self.inserts: List[Insert] = []
    self.controllers: List[Controller] = []
    self._unparsed_events: List[EventType] = []

__repr__()

Source code in pyflp/project.py
64
65
66
67
68
69
70
71
72
73
def __repr__(self) -> str:
    return "<Project {}, {}, {}, {}, {}, {}, {}>".format(
        f'version="{self.misc.version}"',
        f"{len(self.channels)} channels",
        f"{len(self.patterns)} patterns",
        f"{len(self.used_insert_nums())} inserts used",
        f"{self.slots_used()} insert slots occupied",
        f"{len(self.controllers)} controllers",
        f"{len(self.events)} events ({len(self._unparsed_events)} unparsed)",
    )

__save_state()

Calls _save for all _FLObjects and returns a sorted list of the received events

Returns:

Type Description
List[EventType]

List[Event]: A list of events sorted by Event.index

Source code in pyflp/project.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def __save_state(self) -> List[EventType]:
    """Calls `_save` for all `_FLObject`s and returns a sorted list of the received events

    Returns:
        List[Event]: A list of events sorted by `Event.index`
    """
    from pyflp._flobject import _FLObject

    event_store: List[EventType] = []

    # Misc
    misc_events = list(self.misc._save())
    if misc_events:
        event_store.extend(misc_events)

    # Unparsed/unimplemented events
    if self._unparsed_events:
        event_store.extend(self._unparsed_events)

    for param in (
        "filters",
        "channels",
        "patterns",
        "arrangements",
        "inserts",
        "controllers",
    ):
        objs: List[_FLObject] = getattr(self, param)
        for obj in objs:
            event_store.extend(obj._save())

    # Insert params event
    for e in self.events:
        if e.id_ == InsertParamsEvent.ID:
            event_store.append(e)

    # ? Assign event store to self.events
    self.events = event_store

    # Sort the events in ascending order w.r.t index
    event_store.sort(key=lambda event: event.index)
    return event_store

create_zip(path='')

Exports a ZIP looped package of an FLP. Importing stock sample Works only on Windows

Parameters:

Name Type Description Default
path Union[str, Path]

The path to save the ZIP to.

''

Raises:

Type Description
AttributeError

When path is default and Project was created from a stream

Source code in pyflp/project.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def create_zip(self, path: Union[str, Path] = "") -> None:
    """Exports a ZIP looped package of an FLP.
    Importing stock sample Works only on Windows

    Args:
        path: The path to save the ZIP to.

    Raises:
        AttributeError: When path is default and Project was created from a stream
    """

    # Init
    if isinstance(path, str):
        if not path:
            if not hasattr(self, "save_path"):
                raise AttributeError(
                    "Optional argument 'path' cannot be default to create "
                    "a ZIP for a Project object created through a stream."
                )
            path = Path(self.save_path)
        path = Path(path)
    p = os.fspath(path.with_suffix(".zip"))

    # Get FL Studio install dir for copying stock samples
    il = Path(os.environ["PROGRAMFILES"]) / "Image-Line"
    asio = il / "FL Studio ASIO"
    shared = il / "Shared"
    fl_dir = Path()
    for dir in il.iterdir():
        if dir in (asio, shared):
            continue
        else:
            fl_dir = dir
            break

    cwd = os.getcwd()
    os.chdir(str(path.parent))
    with zipfile.ZipFile(p, "x") as archive:
        system = platform.system()

        # Add FLP to ZIP
        archive.write(str(path.name))

        # Find sampler and audio channels
        for channel in self.channels:
            sample_path = channel.sample_path

            # Check whether sample file exists
            if sample_path:

                # Check existence of stock samples and Windows
                if (
                    sample_path.find(r"%FLStudioFactoryData") != -1
                    and system != "Windows"
                ):
                    warnings.warn(
                        f"Cannot import stock samples from {system}. "
                        "Only Windows is supported currently.",
                        RuntimeWarning,
                    )

                # Resolve locations of stock samples
                if fl_dir.exists():
                    sample_path = sample_path.replace(
                        r"%FLStudioFactoryData%", str(fl_dir), 1
                    )
                else:
                    warnings.warn(
                        "Importing stock samples requires FL Studio "
                        f"installed at {str(fl_dir)}. Skipping sample"
                    )
                sp = Path(sample_path)
                if not sp.exists():
                    warnings.warn(f"File {sample_path} doesn't exist")
                    continue

                # Add samples to ZIP
                parent = sp.parent.absolute()
                os.chdir(str(parent))
                archive.write(sp.name)
    os.chdir(cwd)

get_stream()

Converts the list of events received from self._save_state() and headers into a single stream. Typically used directly when Project was parsed from a stream, i.e. save_path is not set.

Returns:

Name Type Description
bytes bytes

The stream. Used by save().

Source code in pyflp/project.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def get_stream(self) -> bytes:
    """Converts the list of events received from `self._save_state()`
    and headers into a single stream. Typically used directly when
    `Project` was parsed from a stream, i.e. `save_path` is not set.

    Returns:
        bytes: The stream. Used by `save()`.
    """

    # Save event state
    event_store = self.__save_state()

    # Begin the save process: Stream init
    stream = io.BytesIO()

    # Header
    header = (
        b"FLhd"
        + int.to_bytes(6, 4, "little")
        + self.misc.format.to_bytes(2, "little", signed=True)
        + self.misc.channel_count.to_bytes(2, "little")
        + self.misc.ppq.to_bytes(2, "little")
    )
    stream.write(header)

    # Data chunk header
    data = BytesIOEx(b"FLdt")
    data.seek(4)

    # Calculate chunk length
    chunklen = 0
    for ev in event_store:
        chunklen += ev.size
    data.write_I(chunklen)

    # Dump events
    for ev in event_store:
        data.write(ev.to_raw())
    if (data.tell() - 8) != chunklen:
        raise DataCorruptionDetectedError

    # BytesIOEx -> bytes
    data.seek(0)
    stream.write(data.read())
    stream.seek(0)
    return stream.read()

save(save_path='')

Saves Project to disk.

Parameters:

Name Type Description Default
save_path Union[Path, str]

File path to save to.

''

Raises:

Type Description
AttributeError

When Project.save_path doesn't exist and save_path is not set.

OSError

Exception which caused the write failed, most proably a permission/file-in-use error.

Source code in pyflp/project.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def save(self, save_path: Union[Path, str] = "") -> None:
    """Saves `Project` to disk.

    Args:
        save_path (Union[Path, str], optional): File path to save to.

    Raises:
        AttributeError: When `Project.save_path` \
        doesn't exist and `save_path` is not set.
        OSError: Exception which caused the write failed, \
        most proably a permission/file-in-use error.
    """

    # Type checking and init
    if isinstance(save_path, str):
        save_path = Path(save_path)
    if not (hasattr(self, "save_path") or save_path == "."):
        raise AttributeError(
            "Optional argument 'path' cannot be default when Parser "
            "was initialised from a stream. Use get_stream() instead."
        )
    if hasattr(self, "save_path"):
        if save_path == Path("."):
            save_path = self.save_path
            suffix = save_path.suffix if save_path.suffix else ""
            save_path_bak = save_path.with_suffix(f"{suffix}.bak")
            if save_path_bak.exists():
                save_path_bak.unlink()
            save_path.rename(save_path_bak)
    try:
        stream = self.get_stream()
    except Exception:
        # Rollback
        save_path_bak.rename(save_path)

    with open(save_path, "wb") as fp:
        try:
            fp.write(stream)
        except OSError as e:
            fp.close()
            save_path.unlink()
            if save_path == self.save_path:
                save_path_bak.rename(self.save_path)
            raise e

slots_used()

Returns the total number of slots used across all inserts.

Source code in pyflp/project.py
85
86
87
88
89
90
91
92
def slots_used(self) -> int:
    """Returns the total number of slots used across all inserts."""
    ret = int()
    for insert in self.inserts:
        for slot in insert.slots:
            if slot.is_used():
                ret += 1
    return ret

used_insert_nums()

Returns a set of Insert indexes to which channels are routed.

Source code in pyflp/project.py
76
77
78
79
80
81
82
83
def used_insert_nums(self) -> Set[int]:
    """Returns a set of `Insert` indexes to which channels are routed."""
    ret = set()
    for channel in self.channels:
        i = channel.target_insert
        if i is not None:
            ret.add(i)
    return ret