diff --git a/include/ymysql/mysql_plus.h b/include/ymysql/mysql_plus.h index a0259e8..171f2bf 100644 --- a/include/ymysql/mysql_plus.h +++ b/include/ymysql/mysql_plus.h @@ -141,4 +141,14 @@ namespace ylib::mysql } }; + + + + /// + /// 协程调用 + /// 执行期间会让出协程,并等待再次调度 + /// + /// + /// + std::tuple co_query(ylib::mysql::prepare_statement* ppst); } diff --git a/include/yutil/coroution.h b/include/yutil/coroution.h new file mode 100644 index 0000000..cea6075 --- /dev/null +++ b/include/yutil/coroution.h @@ -0,0 +1,79 @@ +#pragma once +#include +#include +#include +#include +#include "yutil/thread.h" +#include "yutil/queue.hpp" +class co_thread_pool; +namespace ylib::co +{ + class coroutine { + public: + struct promise_type { + coroutine get_return_object() { + return coroutine{ std::coroutine_handle::from_promise(*this) }; + } + std::suspend_always initial_suspend() { return {}; } // 协程初始化时挂起 + std::suspend_never final_suspend() noexcept { return {}; } // 协程结束时挂起 + void return_void() {} + void unhandled_exception() { std::exit(1); } + }; + coroutine(std::coroutine_handle h) : coro(h) {} + ~coroutine() { if (coro) coro.destroy(); } + std::coroutine_handle coro; + }; + + /// + /// 协程调度器 + /// + class scheduler :public ylib::ithread { + public: + // 任务信息 + struct task_info { + // 任务回调 + std::function callback; + // 任务参数 + void* param = nullptr; + // 唤醒协程coco + std::coroutine_handle<>* coco = nullptr; + }; + public: + scheduler(); + ~scheduler(); + /// + /// 启动 + /// + /// 挂起协程执行任务线程池大小 + /// + bool start(uint32 thread_size); + void stop(); + /// + /// 投递协程 + /// + /// + void push(const task_info& info); + /// + /// 投递线程任务 + /// + /// + void push_t(std::function callback); + /// + /// 唤醒协程 + /// + /// + void resume(std::coroutine_handle<>* continuation); + private: + // 通过 ithread 继承 + bool run() override; + /// + /// 处理队列 + /// + void exec_queue(); + private: + // 协程处理队列 + ylib::queue m_queue; + // 线程池 + co_thread_pool* m_pool = nullptr; + }; +} diff --git a/src/ymysql/CMakeLists.txt b/src/ymysql/CMakeLists.txt index 81e9609..1c930ea 100644 --- a/src/ymysql/CMakeLists.txt +++ b/src/ymysql/CMakeLists.txt @@ -8,7 +8,7 @@ get_filename_component(LIBRARY_NAME ${CURRENT_DIR} NAME) ) if(MSVC) - include_directories("D:/3rdparty/mysql-connector-c++-8.2.0-winx64/include/jdbc") + include_directories("D:/3rdparty/mysql-connector-c++-8.3.0-winx64/include/jdbc") else() include_directories(/usr/local/mysql/connector-c++-8.3.0/include/jdbc) endif() diff --git a/src/ynet/src/http_parser.cpp b/src/ynet/src/http_parser.cpp index 797a57b..786a076 100644 --- a/src/ynet/src/http_parser.cpp +++ b/src/ynet/src/http_parser.cpp @@ -234,7 +234,7 @@ void network::http::form_parser::parse_count(std::vector &starts, std::v break; } else - cut_flag.append_c(c); + cut_flag.append(c); } } diff --git a/src/yutil/CMakeLists.txt b/src/yutil/CMakeLists.txt index 1d86698..9216207 100644 --- a/src/yutil/CMakeLists.txt +++ b/src/yutil/CMakeLists.txt @@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 2.8) # 获取上级目录名做为库名 get_filename_component(CURRENT_DIR ${CMAKE_CURRENT_SOURCE_DIR} ABSOLUTE) get_filename_component(LIBRARY_NAME ${CURRENT_DIR} NAME) - + file(GLOB SOURCE_FILES "${PROJECT_SOURCE_DIR}/include/${LIBRARY_NAME}/*.h" "${PROJECT_SOURCE_DIR}/include/${LIBRARY_NAME}/*.hpp" @@ -17,7 +17,7 @@ endif() # 创建库 add_library(${LIBRARY_NAME} ${SOURCE_FILES}) - + ######################## 安装 ######################## install(TARGETS ${LIBRARY_NAME} DESTINATION lib) \ No newline at end of file diff --git a/src/yutil/src/coroution.cpp b/src/yutil/src/coroution.cpp new file mode 100644 index 0000000..e4b6397 --- /dev/null +++ b/src/yutil/src/coroution.cpp @@ -0,0 +1,131 @@ +#include "yutil/coroution.h" +#include "HPSocket/HPSocket.h" +#include "yutil/system.h" +/// +/// 协程线程池 +/// +struct co_thread_pool_param { + void* param = nullptr; + ylib::co::scheduler* scheduler = nullptr; + std::function callback; +}; + + +VOID __HP_CALL co_thread_pool_callback(PVOID pvArg) +{ + auto cp = (co_thread_pool_param*)pvArg; + cp->callback(); + delete cp; +} +class co_thread_pool +{ +public: + +public: + co_thread_pool() + { + m_pool = HP_Create_ThreadPool(); + } + ~co_thread_pool() + { + HP_Destroy_ThreadPool(m_pool); + } + bool start(uint32 thread_size) + { + stop(); + return m_pool->Start(thread_size, 0, TRP_WAIT_FOR); + } + void stop() + { + m_pool->Stop(INFINITE); + } + bool push(std::function callback, ylib::co::scheduler* sche) + { + co_thread_pool_param* cp = new co_thread_pool_param(); + cp->callback = callback; + cp->param = nullptr; + cp->scheduler = sche; + return m_pool->Submit(co_thread_pool_callback,(PVOID)cp, INFINITE); + } + + + void on_worker_end(std::function callback) { m_on_worker_end = callback; } + friend class thread_pool_listener; +private: + std::function m_on_worker_end; + IHPThreadPool* m_pool = nullptr; +}; + + +ylib::co::coroutine co_function(const std::function callback,void* param, ylib::co::scheduler* scheduler) +{ + callback(param, scheduler); + co_await std::suspend_always{}; +} + +ylib::co::scheduler::scheduler() +{ + m_pool = new co_thread_pool(); +} +ylib::co::scheduler::~scheduler() +{ + m_pool->stop(); + delete m_pool; +} +bool ylib::co::scheduler::start(uint32 thread_size) +{ + if (!m_pool->start(thread_size)) + return false; + ::ithread::start(); + return true; +} +void ylib::co::scheduler::stop() +{ + m_pool->stop(); + ::ithread::stop(); + ::ithread::wait(); +} +void ylib::co::scheduler::push(const task_info& info) +{ + m_queue.push(info); +} +void ylib::co::scheduler::push_t(std::function callback) +{ + m_pool->push(callback,this); +} +void ylib::co::scheduler::resume(std::coroutine_handle<>* continuation) +{ + task_info ti; + ti.coco = continuation; + m_queue.push(ti); +} +bool ylib::co::scheduler::run() +{ + // 处理任务 + while (m_queue.size() != 0) + { + exec_queue(); + } + system::sleep_msec(1); + return true; +} + +void ylib::co::scheduler::exec_queue() +{ + task_info ti; + while (m_queue.pop(ti)) + { + if (ti.coco == nullptr) + { + //创建协程 + auto coro = co_function(ti.callback, ti.param, this); + coro.coro.resume(); + } + else + { + //唤醒协程 + ti.coco->resume(); + } + + } +} diff --git a/src/yutil/src/package.cpp b/src/yutil/src/package.cpp index 8fff460..0daa2e9 100644 --- a/src/yutil/src/package.cpp +++ b/src/yutil/src/package.cpp @@ -109,7 +109,7 @@ void ylib::package::to(const std::string& password, ylib::buffer& data) signal_name.append(iter->second->name); } - signal_data.append_uc((uchar)3); + signal_data.append(3); signal_data.append(bytes::to_buffer((int32)iter->second->data.length())); signal_data.append(iter->second->data); // 打包加密 diff --git a/src/yutil/src/thread.cpp b/src/yutil/src/thread.cpp index 2dcdb50..9dbfa9d 100644 --- a/src/yutil/src/thread.cpp +++ b/src/yutil/src/thread.cpp @@ -2,7 +2,7 @@ #include #include "yutil/system.h" #include "yutil/system.h" - void ylib_thread_thread(void* lpParam) +void ylib_thread_thread(void* lpParam) { auto t = (ylib::ithread*)lpParam; t->__thread_handle(lpParam); @@ -98,6 +98,15 @@ HP_Destroy_ThreadPool((IHPThreadPool*)m_hp_thread); } + bool ylib::thread_pool::start(uint32 thread_size) + { + return false; + } + + void ylib::thread_pool::stop() + { + } + bool thread_pool::start(uint32 thread_count, uint32 max_queue_size) { return ((IHPThreadPool*)m_hp_thread)->Start(thread_count, max_queue_size, TRP_CALL_FAIL);