1// <experimental/io_service> -*- C++ -*-
2
3// Copyright (C) 2015-2021 Free Software Foundation, Inc.
4//
5// This file is part of the GNU ISO C++ Library.  This library is free
6// software; you can redistribute it and/or modify it under the
7// terms of the GNU General Public License as published by the
8// Free Software Foundation; either version 3, or (at your option)
9// any later version.
10
11// This library is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14// GNU General Public License for more details.
15
16// Under Section 7 of GPL version 3, you are granted additional
17// permissions described in the GCC Runtime Library Exception, version
18// 3.1, as published by the Free Software Foundation.
19
20// You should have received a copy of the GNU General Public License and
21// a copy of the GCC Runtime Library Exception along with this program;
22// see the files COPYING3 and COPYING.RUNTIME respectively.  If not, see
23// <http://www.gnu.org/licenses/>.
24
25/** @file experimental/io_context
26 *  This is a TS C++ Library header.
27 *  @ingroup networking-ts
28 */
29
30#ifndef _GLIBCXX_EXPERIMENTAL_IO_SERVICE
31#define _GLIBCXX_EXPERIMENTAL_IO_SERVICE 1
32
33#pragma GCC system_header
34
35#if __cplusplus >= 201402L
36
37#include <atomic>
38#include <chrono>
39#include <forward_list>
40#include <functional>
41#include <system_error>
42#include <thread>
43#include <vector>
44#include <experimental/netfwd>
45#include <experimental/executor>
46#if _GLIBCXX_HAVE_UNISTD_H
47# include <unistd.h>
48#endif
49#ifdef _GLIBCXX_HAVE_POLL_H
50# include <poll.h>
51#endif
52#ifdef _GLIBCXX_HAVE_FCNTL_H
53# include <fcntl.h>
54#endif
55
56namespace std _GLIBCXX_VISIBILITY(default)
57{
58_GLIBCXX_BEGIN_NAMESPACE_VERSION
59namespace experimental
60{
61namespace net
62{
63inline namespace v1
64{
65
66  /** @addtogroup networking-ts
67   *  @{
68   */
69
70  class __socket_impl;
71
72  /// An ExecutionContext for I/O operations.
73  class io_context : public execution_context
74  {
75  public:
76    // types:
77
78    /// An executor for an io_context.
79    class executor_type
80    {
81    public:
82      // construct / copy / destroy:
83
84      executor_type(const executor_type& __other) noexcept = default;
85      executor_type(executor_type&& __other) noexcept = default;
86
87      executor_type& operator=(const executor_type& __other) noexcept = default;
88      executor_type& operator=(executor_type&& __other) noexcept = default;
89
90      // executor operations:
91
92      bool running_in_this_thread() const noexcept
93      {
94#ifdef _GLIBCXX_HAS_GTHREADS
95	lock_guard<execution_context::mutex_type> __lock(_M_ctx->_M_mtx);
96	auto __end = _M_ctx->_M_call_stack.end();
97	return std::find(_M_ctx->_M_call_stack.begin(), __end,
98			 this_thread::get_id()) != __end;
99#else
100	return _M_ctx->_M_run_count != 0;
101#endif
102      }
103
104      io_context& context() const noexcept { return *_M_ctx; }
105
106      void on_work_started() const noexcept { ++_M_ctx->_M_work_count; }
107      void on_work_finished() const noexcept { --_M_ctx->_M_work_count; }
108
109      template<typename _Func, typename _ProtoAllocator>
110	void
111	dispatch(_Func&& __f, const _ProtoAllocator& __a) const
112	{
113	  if (running_in_this_thread())
114	    decay_t<_Func>{std::forward<_Func>(__f)}();
115	  else
116	    post(std::forward<_Func>(__f), __a);
117	}
118
119      template<typename _Func, typename _ProtoAllocator>
120	void
121	post(_Func&& __f, const _ProtoAllocator& __a) const
122	{
123	  lock_guard<execution_context::mutex_type> __lock(_M_ctx->_M_mtx);
124	  // TODO (re-use functionality in system_context)
125	  _M_ctx->_M_reactor._M_notify();
126	}
127
128      template<typename _Func, typename _ProtoAllocator>
129	void
130	defer(_Func&& __f, const _ProtoAllocator& __a) const
131	{ post(std::forward<_Func>(__f), __a); }
132
133    private:
134      friend io_context;
135
136      explicit
137      executor_type(io_context& __ctx) : _M_ctx(std::addressof(__ctx)) { }
138
139      io_context* _M_ctx;
140    };
141
142    using count_type =  size_t;
143
144    // construct / copy / destroy:
145
146    io_context() : _M_work_count(0) { }
147
148    explicit
149    io_context(int __concurrency_hint) : _M_work_count(0) { }
150
151    io_context(const io_context&) = delete;
152    io_context& operator=(const io_context&) = delete;
153
154    // io_context operations:
155
156    executor_type get_executor() noexcept { return executor_type(*this); }
157
158    count_type
159    run()
160    {
161      count_type __n = 0;
162      while (run_one())
163	if (__n != numeric_limits<count_type>::max())
164	  ++__n;
165      return __n;
166    }
167
168    template<typename _Rep, typename _Period>
169      count_type
170      run_for(const chrono::duration<_Rep, _Period>& __rel_time)
171      { return run_until(chrono::steady_clock::now() + __rel_time); }
172
173    template<typename _Clock, typename _Duration>
174      count_type
175      run_until(const chrono::time_point<_Clock, _Duration>& __abs_time)
176      {
177	count_type __n = 0;
178	while (run_one_until(__abs_time))
179	  if (__n != numeric_limits<count_type>::max())
180	    ++__n;
181	return __n;
182      }
183
184    count_type
185    run_one()
186    { return _M_do_one(chrono::milliseconds{-1}); }
187
188    template<typename _Rep, typename _Period>
189      count_type
190      run_one_for(const chrono::duration<_Rep, _Period>& __rel_time)
191      { return run_one_until(chrono::steady_clock::now() + __rel_time); }
192
193    template<typename _Clock, typename _Duration>
194      count_type
195      run_one_until(const chrono::time_point<_Clock, _Duration>& __abs_time)
196      {
197	auto __now = _Clock::now();
198	while (__now < __abs_time)
199	  {
200	    using namespace std::chrono;
201	    auto __ms = duration_cast<milliseconds>(__abs_time - __now);
202	    if (_M_do_one(__ms))
203	      return 1;
204	    __now = _Clock::now();
205	  }
206	return 0;
207      }
208
209    count_type
210    poll()
211    {
212      count_type __n = 0;
213      while (poll_one())
214	if (__n != numeric_limits<count_type>::max())
215	  ++__n;
216      return __n;
217    }
218
219    count_type
220    poll_one()
221    { return _M_do_one(chrono::milliseconds{0}); }
222
223    void stop()
224    {
225      lock_guard<execution_context::mutex_type> __lock(_M_mtx);
226      _M_stopped = true;
227      _M_reactor._M_notify();
228    }
229
230    bool stopped() const noexcept
231    {
232      lock_guard<execution_context::mutex_type> __lock(_M_mtx);
233      return _M_stopped;
234    }
235
236    void restart()
237    {
238      _M_stopped = false;
239    }
240
241  private:
242
243    template<typename _Clock, typename _WaitTraits>
244      friend class basic_waitable_timer;
245
246    friend __socket_impl;
247
248    template<typename _Protocol>
249      friend class __basic_socket_impl;
250
251    template<typename _Protocol>
252      friend class basic_socket;
253
254    template<typename _Protocol>
255      friend class basic_datagram_socket;
256
257    template<typename _Protocol>
258      friend class basic_stream_socket;
259
260    template<typename _Protocol>
261      friend class basic_socket_acceptor;
262
263    count_type
264    _M_outstanding_work() const
265    { return _M_work_count + !_M_ops.empty(); }
266
267    struct __timer_queue_base : execution_context::service
268    {
269      // return milliseconds until next timer expires, or milliseconds::max()
270      virtual chrono::milliseconds _M_next() const = 0;
271      virtual bool run_one() = 0;
272
273    protected:
274      explicit
275      __timer_queue_base(execution_context& __ctx) : service(__ctx)
276      {
277	auto& __ioc = static_cast<io_context&>(__ctx);
278	lock_guard<execution_context::mutex_type> __lock(__ioc._M_mtx);
279	__ioc._M_timers.push_back(this);
280      }
281
282      mutable execution_context::mutex_type _M_qmtx;
283    };
284
285    template<typename _Timer, typename _Key = typename _Timer::_Key>
286      struct __timer_queue : __timer_queue_base
287      {
288	using key_type = __timer_queue;
289
290	explicit
291	__timer_queue(execution_context& __ctx) : __timer_queue_base(__ctx)
292	{ }
293
294	void shutdown() noexcept { }
295
296	io_context& context() noexcept
297	{ return static_cast<io_context&>(service::context()); }
298
299	// Start an asynchronous wait.
300	void
301	push(const _Timer& __t, function<void(error_code)> __h)
302	{
303	  context().get_executor().on_work_started();
304	  lock_guard<execution_context::mutex_type> __lock(_M_qmtx);
305	  _M_queue.emplace(__t, _M_next_id++, std::move(__h));
306	  // no need to notify reactor unless this timer went to the front?
307	}
308
309	// Cancel all outstanding waits for __t
310	size_t
311	cancel(const _Timer& __t)
312	{
313	  lock_guard<execution_context::mutex_type> __lock(_M_qmtx);
314	  size_t __count = 0;
315	  auto __last = _M_queue.end();
316	  for (auto __it = _M_queue.begin(), __end = __last; __it != __end;
317	      ++__it)
318	    {
319	      if (__it->_M_key == __t._M_key.get())
320		{
321		  __it->cancel();
322		  __last = __it;
323		  ++__count;
324		}
325	    }
326	  if (__count)
327	    _M_queue._M_sort_to(__last);
328	  return __count;
329	}
330
331	// Cancel oldest outstanding wait for __t
332	bool
333	cancel_one(const _Timer& __t)
334	{
335	  lock_guard<execution_context::mutex_type> __lock(_M_qmtx);
336	  const auto __end = _M_queue.end();
337	  auto __oldest = __end;
338	  for (auto __it = _M_queue.begin(); __it != __end; ++__it)
339	    if (__it->_M_key == __t._M_key.get())
340	      if (__oldest == __end || __it->_M_id < __oldest->_M_id)
341		__oldest = __it;
342	  if (__oldest == __end)
343	    return false;
344	  __oldest->cancel();
345	  _M_queue._M_sort_to(__oldest);
346	  return true;
347	}
348
349	chrono::milliseconds
350	_M_next() const override
351	{
352	  typename _Timer::time_point __exp;
353	  {
354	    lock_guard<execution_context::mutex_type> __lock(_M_qmtx);
355	    if (_M_queue.empty())
356	      return chrono::milliseconds::max();  // no pending timers
357	    if (_M_queue.top()._M_key == nullptr)
358	      return chrono::milliseconds::zero(); // cancelled, run now
359	    __exp = _M_queue.top()._M_expiry;
360	  }
361	  auto __dur = _Timer::traits_type::to_wait_duration(__exp);
362	  if (__dur < __dur.zero())
363	    __dur = __dur.zero();
364	  return chrono::duration_cast<chrono::milliseconds>(__dur);
365	}
366
367      private:
368
369	bool run_one() override
370	{
371	  auto __now = _Timer::clock_type::now();
372	  function<void(error_code)> __h;
373	  error_code __ec;
374	  {
375	    lock_guard<execution_context::mutex_type> __lock(_M_qmtx);
376
377	    if (_M_queue.top()._M_key == nullptr) // cancelled
378	      {
379		__h = std::move(_M_queue.top()._M_h);
380		__ec = std::make_error_code(errc::operation_canceled);
381		_M_queue.pop();
382	      }
383	    else if (_M_queue.top()._M_expiry <= _Timer::clock_type::now())
384	      {
385		__h = std::move(_M_queue.top()._M_h);
386		_M_queue.pop();
387	      }
388	  }
389	  if (__h)
390	    {
391	      __h(__ec);
392	      context().get_executor().on_work_finished();
393	      return true;
394	    }
395	  return false;
396	}
397
398	using __timer_id_type = uint64_t;
399
400	struct __pending_timer
401	{
402	  __pending_timer(const _Timer& __t, uint64_t __id,
403			  function<void(error_code)> __h)
404	  : _M_expiry(__t.expiry()), _M_key(__t._M_key.get()), _M_id(__id),
405	    _M_h(std::move(__h))
406	  { }
407
408	  typename _Timer::time_point _M_expiry;
409	  _Key* _M_key;
410	  __timer_id_type _M_id;
411	  function<void(error_code)> _M_h;
412
413	  void cancel() { _M_expiry = _M_expiry.min(); _M_key = nullptr; }
414
415	  bool
416	  operator<(const __pending_timer& __rhs) const
417	  { return _M_expiry < __rhs._M_expiry; }
418	};
419
420	struct __queue : priority_queue<__pending_timer>
421	{
422	  using iterator =
423	    typename priority_queue<__pending_timer>::container_type::iterator;
424
425	  // expose begin/end/erase for direct access to underlying container
426	  iterator begin() { return this->c.begin(); }
427	  iterator end() { return this->c.end(); }
428	  iterator erase(iterator __it) { return this->c.erase(__it); }
429
430	  void
431	  _M_sort_to(iterator __it)
432	  { std::stable_sort(this->c.begin(), ++__it); }
433	};
434
435	__queue	_M_queue;
436	__timer_id_type _M_next_id = 0;
437      };
438
439    template<typename _Timer, typename _CompletionHandler>
440      void
441      async_wait(const _Timer& __timer, _CompletionHandler&& __h)
442      {
443	auto& __queue = use_service<__timer_queue<_Timer>>(*this);
444	__queue.push(__timer, std::move(__h));
445	_M_reactor._M_notify();
446      }
447
448    // Cancel all wait operations initiated by __timer.
449    template<typename _Timer>
450      size_t
451      cancel(const _Timer& __timer)
452      {
453	if (!has_service<__timer_queue<_Timer>>(*this))
454	  return 0;
455
456	auto __c = use_service<__timer_queue<_Timer>>(*this).cancel(__timer);
457	if (__c != 0)
458	  _M_reactor._M_notify();
459	return __c;
460      }
461
462    // Cancel the oldest wait operation initiated by __timer.
463    template<typename _Timer>
464      size_t
465      cancel_one(const _Timer& __timer)
466      {
467	if (!has_service<__timer_queue<_Timer>>(*this))
468	  return 0;
469
470	if (use_service<__timer_queue<_Timer>>(*this).cancel_one(__timer))
471	  {
472	    _M_reactor._M_notify();
473	    return 1;
474	  }
475	return 0;
476      }
477
478    template<typename _Op>
479      void
480      async_wait(int __fd, int __w, _Op&& __op)
481      {
482	lock_guard<execution_context::mutex_type> __lock(_M_mtx);
483	// TODO need push_back, use std::list not std::forward_list
484	auto __tail = _M_ops.before_begin(), __it = _M_ops.begin();
485	while (__it != _M_ops.end())
486	  {
487	    ++__it;
488	    ++__tail;
489	  }
490	using __type = __async_operation_impl<_Op>;
491	_M_ops.emplace_after(__tail,
492			     make_unique<__type>(std::move(__op), __fd, __w));
493	_M_reactor._M_fd_interest(__fd, __w);
494      }
495
496    void _M_add_fd(int __fd) { _M_reactor._M_add_fd(__fd); }
497    void _M_remove_fd(int __fd) { _M_reactor._M_remove_fd(__fd); }
498
499    void cancel(int __fd, error_code&)
500    {
501      lock_guard<execution_context::mutex_type> __lock(_M_mtx);
502      const auto __end = _M_ops.end();
503      auto __it = _M_ops.begin();
504      auto __prev = _M_ops.before_begin();
505      while (__it != __end && (*__it)->_M_is_cancelled())
506	{
507	  ++__it;
508	  ++__prev;
509	}
510      auto __cancelled = __prev;
511      while (__it != __end)
512	{
513	  if ((*__it)->_M_fd == __fd)
514	    {
515	      (*__it)->cancel();
516	      ++__it;
517	      _M_ops.splice_after(__cancelled, _M_ops, __prev);
518	      ++__cancelled;
519	    }
520	  else
521	    {
522	      ++__it;
523	      ++__prev;
524	    }
525	}
526      _M_reactor._M_not_interested(__fd);
527    }
528
529    struct __async_operation
530    {
531      __async_operation(int __fd, int __ev) : _M_fd(__fd), _M_ev(__ev) { }
532
533      virtual ~__async_operation() = default;
534
535      int _M_fd;
536      short _M_ev;
537
538      void cancel() { _M_fd = -1; }
539      bool _M_is_cancelled() const { return _M_fd == -1; }
540      virtual void run(io_context&) = 0;
541    };
542
543    template<typename _Op>
544      struct __async_operation_impl : __async_operation
545      {
546	__async_operation_impl(_Op&& __op, int __fd, int __ev)
547	: __async_operation{__fd, __ev}, _M_op(std::move(__op)) { }
548
549	_Op _M_op;
550
551	void run(io_context& __ctx)
552	{
553	  if (_M_is_cancelled())
554	    _M_op(std::make_error_code(errc::operation_canceled));
555	  else
556	    _M_op(error_code{});
557	}
558      };
559
560    atomic<count_type>		_M_work_count;
561    mutable execution_context::mutex_type		_M_mtx;
562    queue<function<void()>>	_M_op;
563    bool			_M_stopped = false;
564
565    struct __monitor
566    {
567      __monitor(io_context& __c) : _M_ctx(__c)
568      {
569#ifdef _GLIBCXX_HAS_GTHREADS
570	lock_guard<execution_context::mutex_type> __lock(_M_ctx._M_mtx);
571	_M_ctx._M_call_stack.push_back(this_thread::get_id());
572#else
573	_M_ctx._M_run_count++;
574#endif
575      }
576
577      ~__monitor()
578      {
579#ifdef _GLIBCXX_HAS_GTHREADS
580	lock_guard<execution_context::mutex_type> __lock(_M_ctx._M_mtx);
581	_M_ctx._M_call_stack.pop_back();
582#else
583	_M_ctx._M_run_count--;
584#endif
585	if (_M_ctx._M_outstanding_work() == 0)
586	  {
587	    _M_ctx._M_stopped = true;
588	    _M_ctx._M_reactor._M_notify();
589	  }
590      }
591
592      __monitor(__monitor&&) = delete;
593
594      io_context& _M_ctx;
595    };
596
597    bool
598    _M_do_one(chrono::milliseconds __timeout)
599    {
600      const bool __block = __timeout != chrono::milliseconds::zero();
601
602      __reactor::__fdvec __fds;
603
604      __monitor __mon{*this};
605
606      __timer_queue_base* __timerq = nullptr;
607      unique_ptr<__async_operation> __async_op;
608
609      while (true)
610	{
611	  if (__timerq)
612	    {
613	      if (__timerq->run_one())
614		return true;
615	      else
616		__timerq = nullptr;
617	    }
618
619	  if (__async_op)
620	    {
621	      __async_op->run(*this);
622	      // TODO need to unregister __async_op
623	      return true;
624	    }
625
626	  chrono::milliseconds __ms{0};
627
628	  {
629	    lock_guard<execution_context::mutex_type> __lock(_M_mtx);
630
631	    if (_M_stopped)
632	      return false;
633
634	    // find first timer with something to do
635	    for (auto __q : _M_timers)
636	      {
637		auto __next = __q->_M_next();
638		if (__next == __next.zero())  // ready to run immediately
639		  {
640		    __timerq = __q;
641		    __ms = __next;
642		    break;
643		  }
644		else if (__next != __next.max() && __block
645		    && (__next < __ms || __timerq == nullptr))
646		  {
647		    __timerq = __q;
648		    __ms = __next;
649		  }
650	      }
651
652	    if (__timerq && __ms == __ms.zero())
653	      continue;  // restart loop to run a timer immediately
654
655	    if (!_M_ops.empty() && _M_ops.front()->_M_is_cancelled())
656	      {
657		_M_ops.front().swap(__async_op);
658		_M_ops.pop_front();
659		continue;
660	      }
661
662	    // TODO run any posted items
663
664	    if (__block)
665	      {
666		if (__timerq == nullptr)
667		  __ms = __timeout;
668		else if (__ms.zero() <= __timeout && __timeout < __ms)
669		  __ms = __timeout;
670		else if (__ms.count() > numeric_limits<int>::max())
671		  __ms = chrono::milliseconds{numeric_limits<int>::max()};
672	      }
673	    // else __ms == 0 and poll() will return immediately
674
675	  }
676
677	  auto __res = _M_reactor.wait(__fds, __ms);
678
679	  if (__res == __reactor::_S_retry)
680	    continue;
681
682	  if (__res == __reactor::_S_timeout)
683	    {
684	      if (__timerq == nullptr)
685		return false;
686	      else
687		continue;  // timed out, so restart loop and process the timer
688	    }
689
690	  __timerq = nullptr;
691
692	  if (__fds.empty()) // nothing to do
693	    return false;
694
695	  lock_guard<execution_context::mutex_type> __lock(_M_mtx);
696	  for (auto __it = _M_ops.begin(), __end = _M_ops.end(),
697	      __prev = _M_ops.before_begin(); __it != __end; ++__it, ++__prev)
698	    {
699	      auto& __op = **__it;
700	      auto __pos = std::lower_bound(__fds.begin(), __fds.end(),
701		  __op._M_fd,
702		  [](const auto& __p, int __fd) { return __p.fd < __fd; });
703	      if (__pos != __fds.end() && __pos->fd == __op._M_fd
704		  && __pos->revents & __op._M_ev)
705		{
706		  __it->swap(__async_op);
707		  _M_ops.erase_after(__prev);
708		  break;  // restart loop and run op
709		}
710	    }
711	}
712    }
713
714    struct __reactor
715    {
716      __reactor() : _M_fds(1)
717      {
718	int __pipe[2];
719	if (::pipe(__pipe) == -1)
720	  __throw_system_error(errno);
721	if (::fcntl(__pipe[0], F_SETFL, O_NONBLOCK) == -1
722	    || ::fcntl(__pipe[1], F_SETFL, O_NONBLOCK) == -1)
723	  {
724	    int __e = errno;
725	    ::close(__pipe[0]);
726	    ::close(__pipe[1]);
727	    __throw_system_error(__e);
728	  }
729	_M_fds.back().events	= POLLIN;
730	_M_fds.back().fd	= __pipe[0];
731	_M_notify_wr		= __pipe[1];
732      }
733
734      ~__reactor()
735      {
736	::close(_M_fds.back().fd);
737	::close(_M_notify_wr);
738      }
739
740      // write a notification byte to the pipe (ignoring errors)
741      void _M_notify()
742      {
743	int __n;
744	do {
745	  __n = ::write(_M_notify_wr, "", 1);
746	} while (__n == -1 && errno == EINTR);
747      }
748
749      // read all notification bytes from the pipe
750      void _M_on_notify()
751      {
752	// Drain the pipe.
753	char __buf[64];
754	ssize_t __n;
755	do {
756	  __n = ::read(_M_fds.back().fd, __buf, sizeof(__buf));
757	} while (__n != -1 || errno == EINTR);
758      }
759
760      void
761      _M_add_fd(int __fd)
762      {
763	auto __pos = _M_lower_bound(__fd);
764	if (__pos->fd == __fd)
765	  __throw_system_error((int)errc::invalid_argument);
766	_M_fds.insert(__pos, __fdvec::value_type{})->fd = __fd;
767	_M_notify();
768      }
769
770      void
771      _M_remove_fd(int __fd)
772      {
773	auto __pos = _M_lower_bound(__fd);
774	if (__pos->fd == __fd)
775	  _M_fds.erase(__pos);
776	// else bug!
777	_M_notify();
778      }
779
780      void
781      _M_fd_interest(int __fd, int __w)
782      {
783	auto __pos = _M_lower_bound(__fd);
784	if (__pos->fd == __fd)
785	  __pos->events |= __w;
786	// else bug!
787	_M_notify();
788      }
789
790      void
791      _M_not_interested(int __fd)
792      {
793	auto __pos = _M_lower_bound(__fd);
794	if (__pos->fd == __fd)
795	  __pos->events = 0;
796	_M_notify();
797      }
798
799# ifdef _GLIBCXX_HAVE_POLL_H
800      using __fdvec = vector<::pollfd>;
801
802      // Find first element p such that !(p.fd < __fd)
803      // N.B. always returns a dereferencable iterator.
804      __fdvec::iterator
805      _M_lower_bound(int __fd)
806      {
807	return std::lower_bound(_M_fds.begin(), _M_fds.end() - 1,
808	    __fd, [](const auto& __p, int __fd) { return __p.fd < __fd; });
809      }
810
811      enum __status { _S_retry, _S_timeout, _S_ok, _S_error };
812
813      __status
814      wait(__fdvec& __fds, chrono::milliseconds __timeout)
815      {
816	// XXX not thread-safe!
817	__fds = _M_fds;  // take snapshot to pass to poll()
818
819	int __res = ::poll(__fds.data(), __fds.size(), __timeout.count());
820
821	if (__res == -1)
822	  {
823	    __fds.clear();
824	    if (errno == EINTR)
825	      return _S_retry;
826	    return _S_error; // XXX ???
827	  }
828	else if (__res == 0)
829	  {
830	    __fds.clear();
831	    return _S_timeout;
832	  }
833	else if (__fds.back().revents != 0) // something changed, restart
834	  {
835	    __fds.clear();
836	    _M_on_notify();
837	    return _S_retry;
838	  }
839
840	auto __part = std::stable_partition(__fds.begin(), __fds.end() - 1,
841	      [](const __fdvec::value_type& __p) { return __p.revents != 0; });
842	__fds.erase(__part, __fds.end());
843
844	return _S_ok;
845      }
846
847      __fdvec _M_fds;	// _M_fds.back() is the read end of the self-pipe
848#endif
849      int _M_notify_wr;	// write end of the self-pipe
850    };
851
852    __reactor _M_reactor;
853
854    vector<__timer_queue_base*>			_M_timers;
855    forward_list<unique_ptr<__async_operation>>	_M_ops;
856
857#ifdef _GLIBCXX_HAS_GTHREADS
858    vector<thread::id>	_M_call_stack;
859#else
860    int _M_run_count = 0;
861#endif
862  };
863
864  inline bool
865  operator==(const io_context::executor_type& __a,
866	     const io_context::executor_type& __b) noexcept
867  {
868    // https://github.com/chriskohlhoff/asio-tr2/issues/201
869    using executor_type = io_context::executor_type;
870    return std::addressof(executor_type(__a).context())
871      == std::addressof(executor_type(__b).context());
872  }
873
874  inline bool
875  operator!=(const io_context::executor_type& __a,
876	     const io_context::executor_type& __b) noexcept
877  { return !(__a == __b); }
878
879  template<> struct is_executor<io_context::executor_type> : true_type {};
880
881  /// @}
882
883} // namespace v1
884} // namespace net
885} // namespace experimental
886_GLIBCXX_END_NAMESPACE_VERSION
887} // namespace std
888
889#endif // C++14
890
891#endif // _GLIBCXX_EXPERIMENTAL_IO_SERVICE
892