Observers
Categories:
A key feature of RsyncUI is observation for two notifications:
NSNotification.Name.NSFileHandleDataAvailable
Process.didTerminateNotification
Without observation and required actions when observed, RsyncUI becomes useless. Both observations are linked to the external task executing the actual rsync task.
Notifications
The initial notification, NSNotification.Name.NSFileHandleDataAvailable
, monitors the generation of output by the external task. To display the progress of a synchronization task, RsyncUI relies on monitoring the output from rsync. Consequently, the —verbose
parameter to rsync is essential. This parameter instructs rsync to output information during execution.
The second notification, Process.didTerminateNotification
, monitors the termination of the task, such as an abrupt halt. Typically, a termination signifies task completion. However, it can also indicate an abort action from the user, which subsequently sends an interrupt signal to the external task. If RsyncUI fails to detect this signal, it will be unable to discern when a synchronization task has been completed.
In the latest version of RsyncUI (2.6.4), a new method for observing notifications have been introduced. After some investigation and recommendation, the AsyncSequence method emerges as the preferred approach. By applying this the Combine Framework and depended code is removed.
NotificationCenter.Notifications and AsyncSequence
If you search for the following: NotificationCenter.Notifications + AsyncSequence, by AI Google summarize the search by:
NotificationCenter.Notifications provides a method to receive notifications from NotificationCenter as an AsyncSequence, enabling asynchronous iteration using the for await syntax. This modern approach to handling notifications in Swift concurrency contrasts with older methods that may utilize Combine or require explicit handling of NSNotification objects.
Quote Apple documentation: “An asynchronous sequence of notifications generated by a notification center.” The for await loops iterate over the notifications and respond to each one. Upon detecting the termination signal, the tasks must also be canceled, and the observers must be removed.
// AsyncSequence
let sequencefilehandler = NotificationCenter.default.notifications(named: NSNotification.Name.NSFileHandleDataAvailable, object: nil)
let sequencetermination = NotificationCenter.default.notifications(named: Process.didTerminateNotification, object: nil)
// Tasks
var sequenceFileHandlerTask: Task<Void, Never>?
var sequenceTerminationTask: Task<Void, Never>?
....
sequenceFileHandlerTask = Task {
for await _ in sequencefilehandler {
await self.datahandle(pipe)
}
}
sequenceTerminationTask = Task {
for await _ in sequencetermination {
Task {
try await Task.sleep(seconds: 0.5)
await self.termination()
}
}
}
Upon task completion, the Observers must be released, and corresponding Tasks must be canceled to prevent a retain cycle and memory leak.
func datahandle(_ pipe: Pipe) async {
let outHandle = pipe.fileHandleForReading
let data = outHandle.availableData
if data.count > 0 {
if let str = NSString(data: data, encoding: String.Encoding.utf8.rawValue) {
str.enumerateLines { line, _ in
self.output.append(line)
if SharedReference.shared.checkforerrorinrsyncoutput,
self.errordiscovered == false
{
do {
try self.checklineforerror?.checkforrsyncerror(line)
} catch let e {
self.errordiscovered = true
let error = e
self.propogateerror(error: error)
}
}
}
// Send message about files
if usefilehandler {
filehandler(output.count)
}
}
outHandle.waitForDataInBackgroundAndNotify()
}
}
func termination() async {
processtermination(output, config?.hiddenID)
// Log error in rsync output to file
if errordiscovered, let config {
Task {
await ActorLogToFile(command: config.backupID,
stringoutputfromrsync: output)
}
}
SharedReference.shared.process = nil
// Remove observers
NotificationCenter.default.removeObserver(sequencefilehandler as Any,
name: NSNotification.Name.NSFileHandleDataAvailable,
object: nil)
NotificationCenter.default.removeObserver(sequencetermination as Any,
name: Process.didTerminateNotification,
object: nil)
// Cancel Tasks
sequenceFileHandlerTask?.cancel()
sequenceTerminationTask?.cancel()
Logger.process.info("ProcessRsyncAsyncSequence: process = nil and termination discovered")
}
Removed methods
The two following methods, which still works very well, are removed from the code.
Combine and Publisher
The Combine framework is utilized within the ProcessRsync
and ProcessCommand
objects, which is responsible for initiating external tasks, such as the rsync
synchronize task. The rsync
synchronize task is completed when the last notification is observed. By using Combine, a publisher is added to the Notification center. Every time the Notification center discover one of the notifications, it publish a message.
// Combine, subscribe to NSNotification.Name.NSFileHandleDataAvailable
NotificationCenter.default.publisher(
for: NSNotification.Name.NSFileHandleDataAvailable)
.sink { [self] _ in
....
}.store(in: &subscriptons)
// Combine, subscribe to Process.didTerminateNotification
NotificationCenter.default.publisher(
for: Process.didTerminateNotification)
.debounce(for: .milliseconds(500), scheduler: DispatchQueue.main)
.sink { [self] _ in
....
subscriptons.removeAll()
}.store(in: &subscriptons)
NotificationCenter and addObserver
The second method for observing notifications involves adding Observers to the Notification center. Upon the discovery of a notification, the completion handler is executed. The Process object is annotated to execute on the main thread. It appears that the addObserver closure is marked as Sendable, indicating that mutating properties within the closure must be asynchronous. This is due to the Swift 6 language mode and strict concurrency checking.
// Observers
var notificationsfilehandle: NSObjectProtocol?
var notificationstermination: NSObjectProtocol?
....
notificationsfilehandle =
NotificationCenter.default.addObserver(forName: NSNotification.Name.NSFileHandleDataAvailable,
object: nil, queue: nil)
{ _ in
Task {
await self.datahandle(pipe)
}
}
notificationstermination =
NotificationCenter.default.addObserver(forName: Process.didTerminateNotification,
object: task, queue: nil)
{ _ in
Task {
// Debounce termination for 500 ms
try await Task.sleep(seconds: 0.5)
await self.termination()
}
}
Upon task completion, the Observers must be released to prevent a retain cycle and memory leak.
func datahandle(_ pipe: Pipe) async {
let outHandle = pipe.fileHandleForReading
let data = outHandle.availableData
if data.count > 0 {
if let str = NSString(data: data, encoding: String.Encoding.utf8.rawValue) {
str.enumerateLines { line, _ in
self.output.append(line)
if SharedReference.shared.checkforerrorinrsyncoutput,
self.errordiscovered == false
{
do {
try self.checklineforerror?.checkforrsyncerror(line)
} catch let e {
self.errordiscovered = true
let error = e
self.propogateerror(error: error)
}
}
}
// Send message about files
if usefilehandler {
filehandler(output.count)
}
}
outHandle.waitForDataInBackgroundAndNotify()
}
}
func termination() async {
processtermination(output, config?.hiddenID)
// Log error in rsync output to file
if errordiscovered, let config {
Task {
await ActorLogToFile(command: config.backupID,
stringoutputfromrsync: output)
}
}
SharedReference.shared.process = nil
NotificationCenter.default.removeObserver(notificationsfilehandle as Any,
name: NSNotification.Name.NSFileHandleDataAvailable,
object: nil)
NotificationCenter.default.removeObserver(notificationstermination as Any,
name: Process.didTerminateNotification,
object: nil)
Logger.process.info("ProcessRsyncObserving: process = nil and termination discovered")
}