use sea_orm::{ActiveModelTrait, Database, DatabaseConnection, DbErr, EntityTrait, QueryFilter, ColumnTrait, Set, QueryOrder};
use tokio::runtime::Runtime;
use std::sync::{Arc};
+use std::thread::JoinHandle;
use tokio::sync::{broadcast, mpsc};
use migration::{Migrator, MigratorTrait};
use ui_rs::ui;
/// messages to the backend via the tx Sender.
///
/// The handle also contains a broadcast sender/receiver (btx, brx).
- pub fn start(self) -> BackendHandle {
+ pub fn start(self) -> (BackendHandle, JoinHandle<()>) {
let (tx, mut rx) = mpsc::unbounded_channel::<CmdFuture>();
let broadcast_tx = self.broadcast.clone();
let broadcast_rx = broadcast_tx.subscribe();
let rt = backend.rt.clone();
- std::thread::spawn(move || {
+ let join = std::thread::spawn(move || {
rt.block_on(async move {
- while let Some(cmd) = rx.recv().await {
- tokio::spawn(async move {
- cmd.await
- });
+ let mut set = tokio::task::JoinSet::new();
+
+ loop {
+ tokio::select! {
+ cmd = rx.recv() => {
+ if let Some(cmd) = cmd {
+ set.spawn(async move {
+ cmd.await
+ });
+ } else {
+ break;
+ }
+ }
+
+ Some(result) = set.join_next() => {
+ if let Err(e) = result {
+ println!("task failed {:?}", e);
+ }
+ }
+
+ else => {
+ break;
+ }
+ }
+ }
+
+ // wait for remaining tasks on shutdown
+ while let Some(result) = set.join_next().await {
+ if let Err(e) = result {
+ println!("task failed {:?}", e);
+ }
}
});
});
- BackendHandle {
+ let backend_handle = BackendHandle {
backend: backend_clone,
tx: tx,
btx: broadcast_tx,
brx: broadcast_rx,
- }
+ };
+
+ (backend_handle, join)
}
}