Line data Source code
1 : //
2 : // Copyright (c) 2019 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2024 Christian Mazakas
4 : // Copyright (c) 2024 Mohammad Nejati
5 : //
6 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
7 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8 : //
9 : // Official repository: https://github.com/cppalliance/http_proto
10 : //
11 :
12 : #include <boost/http_proto/detail/except.hpp>
13 : #include <boost/http_proto/detail/header.hpp>
14 : #include <boost/http_proto/message_view_base.hpp>
15 : #include <boost/http_proto/serializer.hpp>
16 :
17 : #include "src/detail/array_of_const_buffers.hpp"
18 : #include "src/detail/brotli_filter_base.hpp"
19 : #include "src/detail/buffer_utils.hpp"
20 : #include "src/detail/zlib_filter_base.hpp"
21 :
22 : #include <boost/buffers/circular_buffer.hpp>
23 : #include <boost/buffers/copy.hpp>
24 : #include <boost/core/bit.hpp>
25 : #include <boost/core/ignore_unused.hpp>
26 : #include <boost/rts/brotli/encode.hpp>
27 : #include <boost/rts/context.hpp>
28 : #include <boost/rts/zlib/compression_method.hpp>
29 : #include <boost/rts/zlib/compression_strategy.hpp>
30 : #include <boost/rts/zlib/deflate.hpp>
31 : #include <boost/rts/zlib/error.hpp>
32 : #include <boost/rts/zlib/flush.hpp>
33 :
34 : #include <stddef.h>
35 :
36 : namespace boost {
37 : namespace http_proto {
38 :
39 : namespace {
40 :
41 : const
42 : buffers::const_buffer
43 : crlf_and_final_chunk = {"\r\n0\r\n\r\n", 7};
44 :
45 : const
46 : buffers::const_buffer
47 : crlf = {"\r\n", 2};
48 :
49 : const
50 : buffers::const_buffer
51 : final_chunk = {"0\r\n\r\n", 5};
52 :
53 : constexpr
54 : std::uint8_t
55 76 : chunk_header_len(
56 : std::size_t max_chunk_size) noexcept
57 : {
58 : return
59 : static_cast<uint8_t>(
60 76 : (core::bit_width(max_chunk_size) + 3) / 4 +
61 76 : 2); // crlf
62 : };
63 :
64 : void
65 1146 : write_chunk_header(
66 : const buffers::mutable_buffer_pair& mbs,
67 : std::size_t size) noexcept
68 : {
69 : static constexpr char hexdig[] =
70 : "0123456789ABCDEF";
71 : char buf[18];
72 1146 : auto p = buf + 16;
73 1146 : auto const n = buffers::size(mbs);
74 5729 : for(std::size_t i = n - 2; i--;)
75 : {
76 4583 : *--p = hexdig[size & 0xf];
77 4583 : size >>= 4;
78 : }
79 1146 : buf[16] = '\r';
80 1146 : buf[17] = '\n';
81 1146 : auto copied = buffers::copy(
82 : mbs,
83 2292 : buffers::const_buffer(p, n));
84 : ignore_unused(copied);
85 1146 : BOOST_ASSERT(copied == n);
86 1146 : }
87 :
88 : //------------------------------------------------
89 :
90 : class zlib_filter
91 : : public detail::zlib_filter_base
92 : {
93 : rts::zlib::deflate_service& svc_;
94 :
95 : public:
96 52 : zlib_filter(
97 : const rts::context& ctx,
98 : http_proto::detail::workspace& ws,
99 : int comp_level,
100 : int window_bits,
101 : int mem_level)
102 52 : : zlib_filter_base(ws)
103 52 : , svc_(ctx.get_service<rts::zlib::deflate_service>())
104 : {
105 104 : system::error_code ec = static_cast<rts::zlib::error>(svc_.init2(
106 52 : strm_,
107 : comp_level,
108 : rts::zlib::deflated,
109 : window_bits,
110 : mem_level,
111 52 : rts::zlib::default_strategy));
112 52 : if(ec != rts::zlib::error::ok)
113 0 : detail::throw_system_error(ec);
114 52 : }
115 :
116 : private:
117 : virtual
118 : std::size_t
119 86 : min_out_buffer() const noexcept override
120 : {
121 : // Prevents deflate from producing
122 : // zero output due to small buffer
123 86 : return 8;
124 : }
125 :
126 : virtual
127 : results
128 4914 : do_process(
129 : buffers::mutable_buffer out,
130 : buffers::const_buffer in,
131 : bool more) noexcept override
132 : {
133 4914 : strm_.next_out = static_cast<unsigned char*>(out.data());
134 4914 : strm_.avail_out = saturate_cast(out.size());
135 4914 : strm_.next_in = static_cast<unsigned char*>(const_cast<void *>(in.data()));
136 4914 : strm_.avail_in = saturate_cast(in.size());
137 :
138 : auto rs = static_cast<rts::zlib::error>(
139 4914 : svc_.deflate(
140 4914 : strm_,
141 : more ? rts::zlib::no_flush : rts::zlib::finish));
142 :
143 4914 : results rv;
144 4914 : rv.out_bytes = saturate_cast(out.size()) - strm_.avail_out;
145 4914 : rv.in_bytes = saturate_cast(in.size()) - strm_.avail_in;
146 4914 : rv.finished = (rs == rts::zlib::error::stream_end);
147 :
148 4914 : if(rs < rts::zlib::error::ok && rs != rts::zlib::error::buf_err)
149 0 : rv.ec = rs;
150 :
151 4914 : return rv;
152 : }
153 : };
154 :
155 : class brotli_filter
156 : : public detail::brotli_filter_base
157 : {
158 : rts::brotli::encode_service& svc_;
159 : rts::brotli::encoder_state* state_;
160 :
161 : public:
162 0 : brotli_filter(
163 : const rts::context& ctx,
164 : http_proto::detail::workspace&,
165 : std::uint32_t comp_quality,
166 : std::uint32_t comp_window)
167 0 : : svc_(ctx.get_service<rts::brotli::encode_service>())
168 : {
169 : // TODO: use custom allocator
170 0 : state_ = svc_.create_instance(nullptr, nullptr, nullptr);
171 0 : if(!state_)
172 0 : detail::throw_bad_alloc();
173 : using encoder_parameter = rts::brotli::encoder_parameter;
174 0 : svc_.set_parameter(state_, encoder_parameter::quality, comp_quality);
175 0 : svc_.set_parameter(state_, encoder_parameter::lgwin, comp_window);
176 0 : }
177 :
178 0 : ~brotli_filter()
179 0 : {
180 0 : svc_.destroy_instance(state_);
181 0 : }
182 :
183 : private:
184 : virtual
185 : results
186 0 : do_process(
187 : buffers::mutable_buffer out,
188 : buffers::const_buffer in,
189 : bool more) noexcept override
190 : {
191 0 : auto* next_in = reinterpret_cast<const std::uint8_t*>(in.data());
192 0 : auto available_in = in.size();
193 0 : auto* next_out = reinterpret_cast<std::uint8_t*>(out.data());
194 0 : auto available_out = out.size();
195 :
196 : using encoder_operation =
197 : rts::brotli::encoder_operation;
198 :
199 0 : bool rs = svc_.compress_stream(
200 : state_,
201 : more ? encoder_operation::process : encoder_operation::finish,
202 : &available_in,
203 : &next_in,
204 : &available_out,
205 : &next_out,
206 : nullptr);
207 :
208 0 : results rv;
209 0 : rv.in_bytes = in.size() - available_in;
210 0 : rv.out_bytes = out.size() - available_out;
211 0 : rv.finished = svc_.is_finished(state_);
212 :
213 : // TODO: use proper error code
214 0 : if(rs == false)
215 0 : rv.ec = error::bad_payload;
216 :
217 0 : return rv;
218 : }
219 : };
220 :
221 : template<class UInt>
222 : std::size_t
223 8 : clamp(
224 : UInt x,
225 : std::size_t limit = (std::numeric_limits<
226 : std::size_t>::max)()) noexcept
227 : {
228 8 : if(x >= limit)
229 2 : return limit;
230 6 : return static_cast<std::size_t>(x);
231 : }
232 :
233 : class serializer_service
234 : : public rts::service
235 : {
236 : public:
237 : serializer::config cfg;
238 : std::size_t space_needed = 0;
239 :
240 24 : serializer_service(
241 : const rts::context&,
242 : serializer::config const& cfg_)
243 24 : : cfg(cfg_)
244 : {
245 24 : space_needed += cfg.payload_buffer;
246 24 : space_needed += cfg.max_type_erase;
247 :
248 24 : if(cfg.apply_deflate_encoder || cfg.apply_gzip_encoder)
249 : {
250 : // TODO: Account for the number of allocations and
251 : // their overhead in the workspace.
252 :
253 : // https://www.zlib.net/zlib_tech.html
254 1 : space_needed +=
255 1 : (1 << (cfg.zlib_window_bits + 2)) +
256 1 : (1 << (cfg.zlib_mem_level + 9)) +
257 1 : (6 * 1024) +
258 : #ifdef __s390x__
259 : 5768 +
260 : #endif
261 1 : detail::workspace::space_needed<zlib_filter>();
262 : }
263 24 : }
264 : };
265 :
266 : } // namespace
267 :
268 : //------------------------------------------------
269 :
270 : void
271 24 : install_serializer_service(
272 : rts::context& ctx,
273 : serializer::config const& cfg)
274 : {
275 24 : ctx.make_service<serializer_service>(cfg);
276 24 : }
277 :
278 : //------------------------------------------------
279 :
280 : class serializer::impl
281 : {
282 : friend stream;
283 :
284 : enum class state
285 : {
286 : reset,
287 : start,
288 : header,
289 : body
290 : };
291 :
292 : enum class style
293 : {
294 : empty,
295 : buffers,
296 : source,
297 : stream
298 : };
299 :
300 : const rts::context& ctx_;
301 : serializer_service& svc_;
302 : detail::workspace ws_;
303 :
304 : detail::filter* filter_ = nullptr;
305 : cbs_gen* cbs_gen_ = nullptr;
306 : source* source_ = nullptr;
307 :
308 : buffers::circular_buffer out_;
309 : buffers::circular_buffer in_;
310 : detail::array_of_const_buffers prepped_;
311 : buffers::const_buffer tmp_;
312 :
313 : state state_ = state::start;
314 : style style_ = style::empty;
315 : uint8_t chunk_header_len_ = 0;
316 : bool more_input_ = false;
317 : bool is_chunked_ = false;
318 : bool needs_exp100_continue_ = false;
319 : bool filter_done_ = false;
320 :
321 : public:
322 24 : impl(const rts::context& ctx)
323 24 : : ctx_(ctx)
324 24 : , svc_(ctx_.get_service<serializer_service>())
325 24 : , ws_(svc_.space_needed)
326 : {
327 24 : }
328 :
329 : void
330 82 : reset() noexcept
331 : {
332 82 : ws_.clear();
333 82 : state_ = state::start;
334 82 : }
335 :
336 : auto
337 2426 : prepare() ->
338 : system::result<const_buffers_type>
339 : {
340 : // Precondition violation
341 2426 : if(state_ < state::header)
342 2 : detail::throw_logic_error();
343 :
344 : // Expect: 100-continue
345 2424 : if(needs_exp100_continue_)
346 : {
347 4 : if(!is_header_done())
348 4 : return const_buffers_type(
349 : prepped_.begin(),
350 2 : 1); // limit to header
351 :
352 2 : needs_exp100_continue_ = false;
353 :
354 2 : BOOST_HTTP_PROTO_RETURN_EC(
355 : error::expect_100_continue);
356 : }
357 :
358 2420 : if(!filter_)
359 : {
360 62 : switch(style_)
361 : {
362 6 : case style::empty:
363 6 : break;
364 :
365 20 : case style::buffers:
366 : {
367 : // add more buffers if prepped_ is half empty.
368 30 : if(more_input_ &&
369 10 : prepped_.capacity() >= prepped_.size())
370 : {
371 4 : prepped_.slide_to_front();
372 50 : while(prepped_.capacity() != 0)
373 : {
374 48 : auto buf = cbs_gen_->next();
375 48 : if(buf.size() == 0)
376 2 : break;
377 46 : prepped_.append(buf);
378 : }
379 4 : if(cbs_gen_->is_empty())
380 : {
381 2 : if(is_chunked_)
382 : {
383 1 : if(prepped_.capacity() != 0)
384 : {
385 1 : prepped_.append(
386 : crlf_and_final_chunk);
387 1 : more_input_ = false;
388 : }
389 : }
390 : else
391 : {
392 1 : more_input_ = false;
393 : }
394 : }
395 : }
396 20 : return detail::make_span(prepped_);
397 : }
398 :
399 23 : case style::source:
400 : {
401 23 : if(out_capacity() == 0 || !more_input_)
402 22 : break;
403 :
404 7 : const auto rs = source_->read(
405 7 : out_prepare());
406 :
407 7 : out_commit(rs.bytes);
408 :
409 7 : if(rs.ec.failed())
410 : {
411 1 : ws_.clear();
412 1 : state_ = state::reset;
413 1 : return rs.ec;
414 : }
415 :
416 6 : if(rs.finished)
417 : {
418 6 : more_input_ = false;
419 6 : out_finish();
420 : }
421 :
422 6 : break;
423 : }
424 :
425 13 : case style::stream:
426 13 : if(out_.size() == 0 && is_header_done() && more_input_)
427 3 : BOOST_HTTP_PROTO_RETURN_EC(
428 : error::need_data);
429 10 : break;
430 : }
431 : }
432 : else // filter
433 : {
434 2358 : switch(style_)
435 : {
436 4 : case style::empty:
437 : {
438 4 : if(out_capacity() == 0 || filter_done_)
439 4 : break;
440 :
441 12 : const auto rs = filter_->process(
442 4 : detail::make_span(out_prepare()),
443 : {}, // empty input
444 : false);
445 :
446 4 : if(rs.ec.failed())
447 : {
448 0 : ws_.clear();
449 0 : state_ = state::reset;
450 0 : return rs.ec;
451 : }
452 :
453 4 : out_commit(rs.out_bytes);
454 :
455 4 : if(rs.finished)
456 : {
457 4 : filter_done_ = true;
458 4 : out_finish();
459 : }
460 :
461 4 : break;
462 : }
463 :
464 376 : case style::buffers:
465 : {
466 2572 : while(out_capacity() != 0 && !filter_done_)
467 : {
468 2196 : if(more_input_ && tmp_.size() == 0)
469 : {
470 2008 : tmp_ = cbs_gen_->next();
471 2008 : if(tmp_.size() == 0) // cbs_gen_ is empty
472 16 : more_input_ = false;
473 : }
474 :
475 4392 : const auto rs = filter_->process(
476 2196 : detail::make_span(out_prepare()),
477 2196 : { tmp_, {} },
478 2196 : more_input_);
479 :
480 2196 : if(rs.ec.failed())
481 : {
482 0 : ws_.clear();
483 0 : state_ = state::reset;
484 0 : return rs.ec;
485 : }
486 :
487 2196 : buffers::trim_front(tmp_, rs.in_bytes);
488 2196 : out_commit(rs.out_bytes);
489 :
490 2196 : if(rs.out_short)
491 0 : break;
492 :
493 2196 : if(rs.finished)
494 : {
495 16 : filter_done_ = true;
496 16 : out_finish();
497 : }
498 : }
499 376 : break;
500 : }
501 :
502 734 : case style::source:
503 : {
504 2180 : while(out_capacity() != 0 && !filter_done_)
505 : {
506 1446 : if(more_input_ && in_.capacity() != 0)
507 : {
508 984 : const auto rs = source_->read(
509 984 : in_.prepare(in_.capacity()));
510 984 : if(rs.ec.failed())
511 : {
512 0 : ws_.clear();
513 0 : state_ = state::reset;
514 0 : return rs.ec;
515 : }
516 984 : if(rs.finished)
517 16 : more_input_ = false;
518 984 : in_.commit(rs.bytes);
519 : }
520 :
521 1446 : const auto rs = filter_->process(
522 1446 : detail::make_span(out_prepare()),
523 : in_.data(),
524 1446 : more_input_);
525 :
526 1446 : if(rs.ec.failed())
527 : {
528 0 : ws_.clear();
529 0 : state_ = state::reset;
530 0 : return rs.ec;
531 : }
532 :
533 1446 : in_.consume(rs.in_bytes);
534 1446 : out_commit(rs.out_bytes);
535 :
536 1446 : if(rs.out_short)
537 0 : break;
538 :
539 1446 : if(rs.finished)
540 : {
541 16 : filter_done_ = true;
542 16 : out_finish();
543 : }
544 : }
545 734 : break;
546 : }
547 :
548 1244 : case style::stream:
549 : {
550 1244 : if(out_capacity() == 0 || filter_done_)
551 804 : break;
552 :
553 1244 : const auto rs = filter_->process(
554 1244 : detail::make_span(out_prepare()),
555 : in_.data(),
556 1244 : more_input_);
557 :
558 1244 : if(rs.ec.failed())
559 : {
560 0 : ws_.clear();
561 0 : state_ = state::reset;
562 440 : return rs.ec;
563 : }
564 :
565 1244 : in_.consume(rs.in_bytes);
566 1244 : out_commit(rs.out_bytes);
567 :
568 1244 : if(rs.finished)
569 : {
570 16 : filter_done_ = true;
571 16 : out_finish();
572 : }
573 :
574 1244 : if(out_.size() == 0 && is_header_done() && more_input_)
575 440 : BOOST_HTTP_PROTO_RETURN_EC(
576 : error::need_data);
577 804 : break;
578 : }
579 : }
580 : }
581 :
582 1956 : prepped_.reset(!is_header_done());
583 5868 : for(auto const& cb : out_.data())
584 : {
585 3912 : if(cb.size() != 0)
586 1948 : prepped_.append(cb);
587 : }
588 1956 : return detail::make_span(prepped_);
589 : }
590 :
591 : void
592 3717 : consume(
593 : std::size_t n)
594 : {
595 : // Precondition violation
596 3717 : if(state_ < state::header)
597 1 : detail::throw_logic_error();
598 :
599 3716 : if(!is_header_done())
600 : {
601 : const auto header_remain =
602 85 : prepped_[0].size();
603 85 : if(n < header_remain)
604 : {
605 12 : prepped_.consume(n);
606 12 : return;
607 : }
608 73 : n -= header_remain;
609 73 : prepped_.consume(header_remain);
610 73 : state_ = state::body;
611 : }
612 :
613 3704 : prepped_.consume(n);
614 :
615 : // no-op when out_ is not in use
616 3704 : out_.consume(n);
617 :
618 3704 : if(!prepped_.empty())
619 1759 : return;
620 :
621 1945 : if(more_input_)
622 1837 : return;
623 :
624 108 : if(filter_ && !filter_done_)
625 34 : return;
626 :
627 74 : if(needs_exp100_continue_)
628 1 : return;
629 :
630 : // ready for next message
631 73 : reset();
632 : }
633 :
634 : void
635 84 : start_init(
636 : message_view_base const& m)
637 : {
638 : // Precondition violation
639 84 : if(state_ != state::start)
640 1 : detail::throw_logic_error();
641 :
642 : // TODO: To uphold the strong exception guarantee,
643 : // `state_` must be reset to `state::start` if an
644 : // exception is thrown during the start operation.
645 83 : state_ = state::header;
646 :
647 : // VFALCO what do we do with
648 : // metadata error code failures?
649 : // m.ph_->md.maybe_throw();
650 :
651 83 : auto const& md = m.metadata();
652 83 : needs_exp100_continue_ = md.expect.is_100_continue;
653 :
654 : // Transfer-Encoding
655 83 : is_chunked_ = md.transfer_encoding.is_chunked;
656 :
657 : // Content-Encoding
658 83 : switch (md.content_encoding.coding)
659 : {
660 26 : case content_coding::deflate:
661 26 : if(!svc_.cfg.apply_deflate_encoder)
662 0 : goto no_filter;
663 52 : filter_ = &ws_.emplace<zlib_filter>(
664 : ctx_,
665 26 : ws_,
666 26 : svc_.cfg.zlib_comp_level,
667 26 : svc_.cfg.zlib_window_bits,
668 26 : svc_.cfg.zlib_mem_level);
669 26 : filter_done_ = false;
670 26 : break;
671 :
672 26 : case content_coding::gzip:
673 26 : if(!svc_.cfg.apply_gzip_encoder)
674 0 : goto no_filter;
675 52 : filter_ = &ws_.emplace<zlib_filter>(
676 : ctx_,
677 26 : ws_,
678 26 : svc_.cfg.zlib_comp_level,
679 52 : svc_.cfg.zlib_window_bits + 16,
680 26 : svc_.cfg.zlib_mem_level);
681 26 : filter_done_ = false;
682 26 : break;
683 :
684 0 : case content_coding::br:
685 0 : if(!svc_.cfg.apply_brotli_encoder)
686 0 : goto no_filter;
687 0 : filter_ = &ws_.emplace<brotli_filter>(
688 : ctx_,
689 0 : ws_,
690 0 : svc_.cfg.brotli_comp_quality,
691 0 : svc_.cfg.brotli_comp_window);
692 0 : filter_done_ = false;
693 0 : break;
694 :
695 0 : no_filter:
696 31 : default:
697 31 : filter_ = nullptr;
698 31 : break;
699 : }
700 83 : }
701 :
702 : void
703 12 : start_empty(
704 : message_view_base const& m)
705 : {
706 12 : start_init(m);
707 11 : style_ = style::empty;
708 :
709 11 : prepped_ = make_array(
710 : 1 + // header
711 : 2); // out buffer pairs
712 :
713 11 : out_init();
714 :
715 11 : if(!filter_)
716 7 : out_finish();
717 :
718 11 : prepped_.append({ m.ph_->cbuf, m.ph_->size });
719 11 : more_input_ = false;
720 11 : }
721 :
722 : void
723 24 : start_buffers(
724 : message_view_base const& m,
725 : cbs_gen& cbs_gen)
726 : {
727 : // start_init() already called
728 24 : style_ = style::buffers;
729 24 : cbs_gen_ = &cbs_gen;
730 :
731 24 : if(!filter_)
732 : {
733 8 : auto stats = cbs_gen_->stats();
734 8 : auto batch_size = clamp(stats.count, 16);
735 :
736 0 : prepped_ = make_array(
737 : 1 + // header
738 8 : batch_size + // buffers
739 8 : (is_chunked_ ? 2 : 0)); // chunk header + final chunk
740 :
741 8 : prepped_.append({ m.ph_->cbuf, m.ph_->size });
742 8 : more_input_ = (batch_size != 0);
743 :
744 8 : if(is_chunked_)
745 : {
746 2 : if(!more_input_)
747 : {
748 1 : prepped_.append(final_chunk);
749 : }
750 : else
751 : {
752 1 : auto h_len = chunk_header_len(stats.size);
753 : buffers::mutable_buffer mb(
754 1 : ws_.reserve_front(h_len), h_len);
755 1 : write_chunk_header({ mb, {} }, stats.size);
756 1 : prepped_.append(mb);
757 : }
758 : }
759 8 : return;
760 : }
761 :
762 : // filter
763 :
764 16 : prepped_ = make_array(
765 : 1 + // header
766 : 2); // out buffer pairs
767 :
768 16 : out_init();
769 :
770 16 : prepped_.append({ m.ph_->cbuf, m.ph_->size });
771 16 : tmp_ = {};
772 16 : more_input_ = true;
773 : }
774 :
775 : void
776 25 : start_source(
777 : message_view_base const& m,
778 : source& source)
779 : {
780 : // start_init() already called
781 25 : style_ = style::source;
782 25 : source_ = &source;
783 :
784 25 : prepped_ = make_array(
785 : 1 + // header
786 : 2); // out buffer pairs
787 :
788 25 : if(filter_)
789 : {
790 : // TODO: smarter buffer distribution
791 16 : auto const n = (ws_.size() - 1) / 2;
792 16 : in_ = { ws_.reserve_front(n), n };
793 : }
794 :
795 25 : out_init();
796 :
797 25 : prepped_.append({ m.ph_->cbuf, m.ph_->size });
798 25 : more_input_ = true;
799 25 : }
800 :
801 : stream
802 23 : start_stream(message_view_base const& m)
803 : {
804 23 : start_init(m);
805 23 : style_ = style::stream;
806 :
807 23 : prepped_ = make_array(
808 : 1 + // header
809 : 2); // out buffer pairs
810 :
811 23 : if(filter_)
812 : {
813 : // TODO: smarter buffer distribution
814 16 : auto const n = (ws_.size() - 1) / 2;
815 16 : in_ = { ws_.reserve_front(n), n };
816 : }
817 :
818 23 : out_init();
819 :
820 23 : prepped_.append({ m.ph_->cbuf, m.ph_->size });
821 23 : more_input_ = true;
822 23 : return stream{ this };
823 : }
824 :
825 : bool
826 2479 : is_done() const noexcept
827 : {
828 2479 : return state_ == state::start;
829 : }
830 :
831 : detail::workspace&
832 49 : ws() noexcept
833 : {
834 49 : return ws_;
835 : }
836 :
837 : private:
838 : bool
839 6123 : is_header_done() const noexcept
840 : {
841 6123 : return state_ == state::body;
842 : }
843 :
844 : detail::array_of_const_buffers
845 83 : make_array(std::size_t n)
846 : {
847 83 : BOOST_ASSERT(n <= std::uint16_t(-1));
848 :
849 : return {
850 83 : ws_.push_array(n,
851 0 : buffers::const_buffer{}),
852 83 : static_cast<std::uint16_t>(n) };
853 : }
854 :
855 : void
856 75 : out_init()
857 : {
858 : // use all the remaining buffer
859 75 : auto const n = ws_.size() - 1;
860 75 : out_ = { ws_.reserve_front(n), n };
861 75 : chunk_header_len_ =
862 75 : chunk_header_len(out_.capacity());
863 75 : if(out_capacity() == 0)
864 0 : detail::throw_length_error();
865 75 : }
866 :
867 : buffers::mutable_buffer_pair
868 4903 : out_prepare() noexcept
869 : {
870 4903 : auto mbp = out_.prepare(out_.capacity());
871 4903 : if(is_chunked_)
872 : {
873 2451 : buffers::trim_front(
874 2451 : mbp, chunk_header_len_);
875 2451 : buffers::trim_back(
876 : mbp, crlf_and_final_chunk.size());
877 : }
878 4903 : return mbp;
879 : }
880 :
881 : void
882 4903 : out_commit(
883 : std::size_t n) noexcept
884 : {
885 4903 : if(is_chunked_)
886 : {
887 2451 : if(n == 0)
888 1306 : return;
889 :
890 1145 : write_chunk_header(out_.prepare(chunk_header_len_), n);
891 1145 : out_.commit(chunk_header_len_);
892 :
893 1145 : out_.prepare(n);
894 1145 : out_.commit(n);
895 :
896 1145 : buffers::copy(out_.prepare(crlf.size()), crlf);
897 1145 : out_.commit(crlf.size());
898 : }
899 : else
900 : {
901 2452 : out_.commit(n);
902 : }
903 : }
904 :
905 : std::size_t
906 6122 : out_capacity() const noexcept
907 : {
908 6122 : if(is_chunked_)
909 : {
910 3058 : auto const overhead = chunk_header_len_ +
911 3058 : crlf_and_final_chunk.size();
912 3058 : if(out_.capacity() < overhead)
913 541 : return 0;
914 2517 : return out_.capacity() - overhead;
915 : }
916 3064 : return out_.capacity();
917 : }
918 :
919 : void
920 72 : out_finish() noexcept
921 : {
922 72 : if(is_chunked_)
923 : {
924 33 : buffers::copy(
925 33 : out_.prepare(final_chunk.size()), final_chunk);
926 33 : out_.commit(final_chunk.size());
927 : }
928 72 : }
929 : };
930 :
931 : //------------------------------------------------
932 :
933 24 : serializer::
934 24 : serializer(const rts::context& ctx)
935 24 : : impl_(new impl(ctx))
936 : {
937 : // TODO: use a single allocation for
938 : // impl and workspace buffer.
939 24 : }
940 :
941 1 : serializer::
942 1 : serializer(serializer&& other) noexcept
943 1 : : impl_(other.impl_)
944 : {
945 1 : other.impl_ = nullptr;
946 1 : }
947 :
948 25 : serializer::
949 : ~serializer()
950 : {
951 25 : delete impl_;
952 25 : }
953 :
954 : void
955 9 : serializer::
956 : reset() noexcept
957 : {
958 9 : BOOST_ASSERT(impl_);
959 9 : impl_->reset();
960 9 : }
961 :
962 : void
963 12 : serializer::
964 : start(message_view_base const& m)
965 : {
966 12 : BOOST_ASSERT(impl_);
967 12 : impl_->start_empty(m);
968 11 : }
969 :
970 : auto
971 23 : serializer::
972 : start_stream(
973 : message_view_base const& m) -> stream
974 : {
975 23 : BOOST_ASSERT(impl_);
976 23 : return impl_->start_stream(m);
977 : }
978 :
979 : auto
980 2426 : serializer::
981 : prepare() ->
982 : system::result<const_buffers_type>
983 : {
984 2426 : BOOST_ASSERT(impl_);
985 2426 : return impl_->prepare();
986 : }
987 :
988 : void
989 3717 : serializer::
990 : consume(std::size_t n)
991 : {
992 3717 : BOOST_ASSERT(impl_);
993 3717 : impl_->consume(n);
994 3716 : }
995 :
996 : bool
997 2479 : serializer::
998 : is_done() const noexcept
999 : {
1000 2479 : BOOST_ASSERT(impl_);
1001 2479 : return impl_->is_done();
1002 : }
1003 :
1004 : //------------------------------------------------
1005 :
1006 : detail::workspace&
1007 49 : serializer::
1008 : ws()
1009 : {
1010 49 : BOOST_ASSERT(impl_);
1011 49 : return impl_->ws();
1012 : }
1013 :
1014 : void
1015 49 : serializer::
1016 : start_init(message_view_base const& m)
1017 : {
1018 49 : BOOST_ASSERT(impl_);
1019 49 : impl_->start_init(m);
1020 49 : }
1021 :
1022 : void
1023 24 : serializer::
1024 : start_buffers(
1025 : message_view_base const& m,
1026 : cbs_gen& cbs_gen)
1027 : {
1028 24 : BOOST_ASSERT(impl_);
1029 24 : impl_->start_buffers(m, cbs_gen);
1030 24 : }
1031 :
1032 : void
1033 25 : serializer::
1034 : start_source(
1035 : message_view_base const& m,
1036 : source& source)
1037 : {
1038 25 : BOOST_ASSERT(impl_);
1039 25 : impl_->start_source(m, source);
1040 25 : }
1041 :
1042 : //------------------------------------------------
1043 :
1044 : std::size_t
1045 1257 : serializer::
1046 : stream::
1047 : capacity() const
1048 : {
1049 : // Precondition violation
1050 1257 : if(!is_open())
1051 1 : detail::throw_logic_error();
1052 :
1053 1256 : if(impl_->filter_)
1054 1232 : return impl_->in_.capacity();
1055 :
1056 24 : return impl_->out_capacity();
1057 : }
1058 :
1059 : auto
1060 1239 : serializer::
1061 : stream::
1062 : prepare() ->
1063 : mutable_buffers_type
1064 : {
1065 : // Precondition violation
1066 1239 : if(!is_open())
1067 1 : detail::throw_logic_error();
1068 :
1069 1238 : if(impl_->filter_)
1070 1232 : return impl_->in_.prepare(
1071 2464 : impl_->in_.capacity());
1072 :
1073 6 : return impl_->out_prepare();
1074 : }
1075 :
1076 : void
1077 1240 : serializer::
1078 : stream::
1079 : commit(std::size_t n)
1080 : {
1081 : // Precondition violation
1082 1240 : if(!is_open())
1083 1 : detail::throw_logic_error();
1084 :
1085 : // Precondition violation
1086 1239 : if(n > capacity())
1087 1 : detail::throw_invalid_argument();
1088 :
1089 1238 : if(impl_->filter_)
1090 1232 : return impl_->in_.commit(n);
1091 :
1092 6 : impl_->out_commit(n);
1093 : }
1094 :
1095 : void
1096 47 : serializer::
1097 : stream::
1098 : close() noexcept
1099 : {
1100 47 : if(!is_open())
1101 24 : return; // no-op;
1102 :
1103 23 : if(!impl_->filter_)
1104 7 : impl_->out_finish();
1105 :
1106 23 : impl_->more_input_ = false;
1107 23 : impl_ = nullptr;
1108 : }
1109 :
1110 : } // http_proto
1111 : } // boost
|