join.hpp 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. #pragma once
  2. #include <iterator>
  3. namespace stream { namespace detail {
  4. #define JOIN_CTOR( mod ) \
  5. start_ = mod(other.start_); \
  6. finish_ = mod(other.finish_); \
  7. std::ptrdiff_t n = std::distance(other.mem_.cbegin(), other.curr_); \
  8. mem_ = mod(other.mem_); \
  9. curr_ = mem_.begin(); \
  10. std::advance(curr_, n); \
  11. end_ = mem_.end();
  12. namespace join {
  13. template <typename C>
  14. class iterator {
  15. public:
  16. iterator(::stream::iterator<C>&& f, ::stream::iterator<C>&& l)
  17. : start_(std::forward<::stream::iterator<C>>(f))
  18. , finish_(std::forward<::stream::iterator<C>>(l))
  19. {
  20. if (start_ != finish_) {
  21. mem_ = *(start_);
  22. curr_ = mem_.begin();
  23. end_ = mem_.end();
  24. advance();
  25. }
  26. }
  27. iterator(iterator const & other) {
  28. JOIN_CTOR( )
  29. }
  30. iterator(iterator && other) {
  31. JOIN_CTOR( std::move )
  32. }
  33. iterator & operator=(iterator const & other) {
  34. JOIN_CTOR( )
  35. return *this;
  36. }
  37. iterator & operator=(iterator && other) {
  38. JOIN_CTOR( std::move )
  39. return *this;
  40. }
  41. ~iterator() {}
  42. typename C::value_type operator*() { return *curr_; }
  43. iterator& operator++() {
  44. ++curr_;
  45. advance();
  46. return *this;
  47. }
  48. DELEGATE_ITERATOR_IMPL_BASE(start_)
  49. private:
  50. void advance() {
  51. while (curr_ == end_ && start_ != finish_) {
  52. if ( ++start_ == finish_ ) { break; }
  53. mem_ = *start_;
  54. curr_ = mem_.begin();
  55. end_ = mem_.end();
  56. }
  57. }
  58. ::stream::iterator<C> start_, finish_;
  59. C mem_;
  60. typename C::const_iterator curr_, end_;
  61. };
  62. }
  63. template <typename C>
  64. class join_stream : public stream_impl<typename C::value_type> {
  65. public:
  66. using T = typename C::value_type;
  67. explicit join_stream(stream_base<C> const& sb) : source_(sb) {}
  68. ~join_stream() override {}
  69. ::stream::iterator<T> begin() override {
  70. return {join::iterator<C>{source_.begin(), source_.end()}};
  71. }
  72. ::stream::iterator<T> end() override {
  73. return {join::iterator<C>{source_.end(), source_.end()}};
  74. }
  75. private:
  76. stream_base<C> source_;
  77. };
  78. template <typename C>
  79. auto make_join(stream_base<C> const& sb) -> stream_base<typename C::value_type> {
  80. using T = typename C::value_type;
  81. using impl_t = join_stream<C>;
  82. return {std::make_shared<impl_t>(sb)};
  83. }
  84. template <typename T>
  85. template <typename F>
  86. auto stream_base<T>::flatmap(F&& func) const -> stream_base<flatmap_f<F>> {
  87. return make_join((*this).map(func));
  88. }
  89. } }