join.hpp 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. #pragma once
  2. namespace stream { namespace detail {
  3. namespace join {
  4. template <typename C>
  5. class iterator : public iterator_impl<typename C::value_type> {
  6. public:
  7. typedef iterator_impl<typename C::value_type> super;
  8. iterator(::stream::iterator<C>&& f, ::stream::iterator<C>&& l)
  9. : start_(std::forward<::stream::iterator<C>>(f))
  10. , finish_(std::forward<::stream::iterator<C>>(l))
  11. {
  12. if (start_ != finish_) {
  13. mem_ = *(start_);
  14. curr_ = mem_.begin();
  15. end_ = mem_.end();
  16. advance();
  17. }
  18. }
  19. ~iterator() {}
  20. typename C::value_type operator*() override { return *curr_; }
  21. super& operator++() override {
  22. ++curr_;
  23. advance();
  24. return *this;
  25. }
  26. DELEGATE_ITERATOR_IMPL_BASE(start_)
  27. private:
  28. void advance() {
  29. while (curr_ == end_ && start_ != finish_) {
  30. if ( ++start_ == finish_ ) { break; }
  31. mem_ = *start_;
  32. curr_ = mem_.begin();
  33. end_ = mem_.end();
  34. }
  35. }
  36. ::stream::iterator<C> start_, finish_;
  37. C mem_;
  38. typename C::iterator curr_, end_;
  39. };
  40. }
  41. template <typename C>
  42. class join_stream : public stream_impl<typename C::value_type> {
  43. public:
  44. using T = typename C::value_type;
  45. explicit join_stream(stream_base<C> const& sb) : source_(sb) {}
  46. ~join_stream() override {}
  47. ::stream::iterator<T> begin() override {
  48. return {new join::iterator<C>{source_.begin(), source_.end()}};
  49. }
  50. ::stream::iterator<T> end() override {
  51. return {new join::iterator<C>{source_.end(), source_.end()}};
  52. }
  53. private:
  54. stream_base<C> source_;
  55. };
  56. template <typename C>
  57. auto make_join(stream_base<C> const& sb) -> stream_base<typename C::value_type> {
  58. using T = typename C::value_type;
  59. using impl_t = join_stream<C>;
  60. return {std::make_shared<impl_t>(sb)};
  61. }
  62. template <typename T>
  63. template <typename F>
  64. auto stream_base<T>::flatmap(F&& func) const -> stream_base<flatmap_f<F>> {
  65. return make_join((*this).map(func));
  66. }
  67. } }