1 // Copyright 2017 The Fuchsia Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <inttypes.h>
6 
7 #ifdef __Fuchsia__
8 #include <fbl/auto_lock.h>
9 #include <fbl/mutex.h>
10 #include <fbl/vector.h>
11 #include <lib/fzl/owned-vmo-mapper.h>
12 #include <lib/zx/vmo.h>
13 #endif
14 
15 #include <fbl/algorithm.h>
16 #include <fbl/intrusive_hash_table.h>
17 #include <fbl/intrusive_single_list.h>
18 #include <fbl/macros.h>
19 #include <fbl/ref_ptr.h>
20 #include <fbl/unique_ptr.h>
21 #include <fs/block-txn.h>
22 #include <fs/vfs.h>
23 
24 #include "minfs-private.h"
25 #include <minfs/writeback.h>
26 
27 #include <utility>
28 
29 namespace minfs {
30 
31 #ifdef __Fuchsia__
32 
Enqueue(zx_handle_t vmo,uint64_t vmo_offset,uint64_t dev_offset,uint64_t nblocks)33 void WriteTxn::Enqueue(zx_handle_t vmo, uint64_t vmo_offset, uint64_t dev_offset,
34                        uint64_t nblocks) {
35     ValidateVmoSize(vmo, static_cast<blk_t>(vmo_offset));
36     for (size_t i = 0; i < requests_.size(); i++) {
37         if (requests_[i].vmo != vmo) {
38             continue;
39         }
40 
41         if (requests_[i].vmo_offset == vmo_offset) {
42             // Take the longer of the operations (if operating on the same
43             // blocks).
44             requests_[i].length = (requests_[i].length > nblocks) ? requests_[i].length : nblocks;
45             return;
46         } else if ((requests_[i].vmo_offset + requests_[i].length == vmo_offset) &&
47                    (requests_[i].dev_offset + requests_[i].length == dev_offset)) {
48             // Combine with the previous request, if immediately following.
49             requests_[i].length += nblocks;
50             return;
51         }
52     }
53 
54     WriteRequest request;
55     request.vmo = vmo;
56     // NOTE: It's easier to compare everything when dealing
57     // with blocks (not offsets!) so the following are described in
58     // terms of blocks until we Flush().
59     request.vmo_offset = vmo_offset;
60     request.dev_offset = dev_offset;
61     request.length = nblocks;
62     requests_.push_back(std::move(request));
63 }
64 
Flush(zx_handle_t vmo,vmoid_t vmoid)65 zx_status_t WriteTxn::Flush(zx_handle_t vmo, vmoid_t vmoid) {
66     ZX_DEBUG_ASSERT(vmo != ZX_HANDLE_INVALID);
67     ZX_DEBUG_ASSERT(vmoid != VMOID_INVALID);
68 
69     // Update all the outgoing transactions to be in "disk blocks",
70     // not "Minfs blocks".
71     block_fifo_request_t blk_reqs[requests_.size()];
72     const uint32_t kDiskBlocksPerMinfsBlock = kMinfsBlockSize / bc_->DeviceBlockSize();
73     for (size_t i = 0; i < requests_.size(); i++) {
74         blk_reqs[i].group = bc_->BlockGroupID();
75         blk_reqs[i].vmoid = vmoid;
76         blk_reqs[i].opcode = BLOCKIO_WRITE;
77         blk_reqs[i].vmo_offset = requests_[i].vmo_offset * kDiskBlocksPerMinfsBlock;
78         blk_reqs[i].dev_offset = requests_[i].dev_offset * kDiskBlocksPerMinfsBlock;
79         // TODO(ZX-2253): Remove this assertion.
80         uint64_t length = requests_[i].length * kDiskBlocksPerMinfsBlock;
81         ZX_ASSERT_MSG(length < UINT32_MAX, "Too many blocks");
82         blk_reqs[i].length = static_cast<uint32_t>(length);
83     }
84 
85     // Actually send the operations to the underlying block device.
86     zx_status_t status = bc_->Transaction(blk_reqs, requests_.size());
87 
88     requests_.reset();
89     return status;
90 }
91 
BlkCount() const92 size_t WriteTxn::BlkCount() const {
93     size_t blocks_needed = 0;
94     for (size_t i = 0; i < requests_.size(); i++) {
95         blocks_needed += requests_[i].length;
96     }
97     return blocks_needed;
98 }
99 
100 #endif  // __Fuchsia__
101 
WritebackWork(Bcache * bc)102 WritebackWork::WritebackWork(Bcache* bc) : WriteTxn(bc),
103 #ifdef __Fuchsia__
104     closure_(nullptr),
105 #endif
106     node_count_(0) {}
107 
Reset()108 void WritebackWork::Reset() {
109 #ifdef __Fuchsia__
110     ZX_DEBUG_ASSERT(Requests().size() == 0);
111     closure_ = nullptr;
112 #endif
113     while (0 < node_count_) {
114         vn_[--node_count_] = nullptr;
115     }
116 }
117 
118 #ifdef __Fuchsia__
119 // Returns the number of blocks of the writeback buffer that have been
120 // consumed
Complete(zx_handle_t vmo,vmoid_t vmoid)121 size_t WritebackWork::Complete(zx_handle_t vmo, vmoid_t vmoid) {
122     size_t blk_count = BlkCount();
123     zx_status_t status = Flush(vmo, vmoid);
124     if (closure_) {
125         closure_(status);
126     }
127     Reset();
128     return blk_count;
129 }
130 
SetClosure(SyncCallback closure)131 void WritebackWork::SetClosure(SyncCallback closure) {
132     ZX_DEBUG_ASSERT(!closure_);
133     closure_ = std::move(closure);
134 }
135 #else
Complete()136 void WritebackWork::Complete() {
137     Transact();
138     Reset();
139 }
140 #endif  // __Fuchsia__
141 
142 // Allow "pinning" Vnodes so they aren't destroyed while we're completing
143 // this writeback operation.
PinVnode(fbl::RefPtr<VnodeMinfs> vn)144 void WritebackWork::PinVnode(fbl::RefPtr<VnodeMinfs> vn) {
145     for (size_t i = 0; i < node_count_; i++) {
146         if (vn_[i].get() == vn.get()) {
147             // Already pinned
148             return;
149         }
150     }
151     ZX_DEBUG_ASSERT(node_count_ < fbl::count_of(vn_));
152     vn_[node_count_++] = std::move(vn);
153 }
154 
155 #ifdef __Fuchsia__
156 
Create(Bcache * bc,fzl::OwnedVmoMapper mapper,fbl::unique_ptr<WritebackBuffer> * out)157 zx_status_t WritebackBuffer::Create(Bcache* bc, fzl::OwnedVmoMapper mapper,
158                                     fbl::unique_ptr<WritebackBuffer>* out) {
159     fbl::unique_ptr<WritebackBuffer> wb(new WritebackBuffer(bc, std::move(mapper)));
160     if (wb->mapper_.size() % kMinfsBlockSize != 0) {
161         return ZX_ERR_INVALID_ARGS;
162     } else if (cnd_init(&wb->consumer_cvar_) != thrd_success) {
163         return ZX_ERR_NO_RESOURCES;
164     } else if (cnd_init(&wb->producer_cvar_) != thrd_success) {
165         return ZX_ERR_NO_RESOURCES;
166     } else if (thrd_create_with_name(&wb->writeback_thrd_,
167                                      WritebackBuffer::WritebackThread, wb.get(),
168                                      "minfs-writeback") != thrd_success) {
169         return ZX_ERR_NO_RESOURCES;
170     }
171     zx_status_t status = wb->bc_->AttachVmo(wb->mapper_.vmo(), &wb->buffer_vmoid_);
172     if (status != ZX_OK) {
173         return status;
174     }
175 
176     *out = std::move(wb);
177     return ZX_OK;
178 }
179 
WritebackBuffer(Bcache * bc,fzl::OwnedVmoMapper mapper)180 WritebackBuffer::WritebackBuffer(Bcache* bc, fzl::OwnedVmoMapper mapper) :
181     bc_(bc), unmounting_(false), mapper_(std::move(mapper)),
182     cap_(mapper_.size() / kMinfsBlockSize) {}
183 
~WritebackBuffer()184 WritebackBuffer::~WritebackBuffer() {
185     // Block until the background thread completes itself.
186     {
187         fbl::AutoLock lock(&writeback_lock_);
188         unmounting_ = true;
189         cnd_signal(&consumer_cvar_);
190     }
191     int r;
192     thrd_join(writeback_thrd_, &r);
193 
194     if (buffer_vmoid_ != VMOID_INVALID) {
195         block_fifo_request_t request;
196         request.group = bc_->BlockGroupID();
197         request.vmoid = buffer_vmoid_;
198         request.opcode = BLOCKIO_CLOSE_VMO;
199         bc_->Transaction(&request, 1);
200     }
201 }
202 
EnsureSpaceLocked(size_t blocks)203 zx_status_t WritebackBuffer::EnsureSpaceLocked(size_t blocks) {
204     if (blocks > cap_) {
205         // There will never be enough room in the writeback buffer
206         // for this request.
207         return ZX_ERR_NO_RESOURCES;
208     }
209     while (len_ + blocks > cap_) {
210         // Not enough room to write back work, yet. Wait until
211         // room is available.
212         Waiter w;
213         producer_queue_.push(&w);
214 
215         do {
216             cnd_wait(&producer_cvar_, writeback_lock_.GetInternal());
217         } while ((&producer_queue_.front() != &w) && // We are first in line to enqueue...
218                  (len_ + blocks > cap_)); // ... and there is enough space for us.
219 
220         producer_queue_.pop();
221     }
222     return ZX_OK;
223 }
224 
CopyToBufferLocked(WriteTxn * txn)225 void WritebackBuffer::CopyToBufferLocked(WriteTxn* txn) {
226     auto& reqs = txn->Requests();
227     // Write back to the buffer
228     for (size_t i = 0; i < reqs.size(); i++) {
229         size_t vmo_offset = reqs[i].vmo_offset;
230         size_t dev_offset = reqs[i].dev_offset;
231         const size_t vmo_len = reqs[i].length;
232         ZX_DEBUG_ASSERT(vmo_len > 0);
233         size_t wb_offset = (start_ + len_) % cap_;
234         size_t wb_len = (wb_offset + vmo_len > cap_) ? cap_ - wb_offset : vmo_len;
235         ZX_DEBUG_ASSERT(wb_len <= vmo_len);
236         ZX_DEBUG_ASSERT(wb_offset < cap_);
237         zx_handle_t vmo = reqs[i].vmo;
238 
239         void* ptr = (void*)((uintptr_t)(mapper_.start()) +
240                             (uintptr_t)(wb_offset * kMinfsBlockSize));
241         zx_status_t status;
242         ZX_DEBUG_ASSERT((start_ <= wb_offset) ?
243                         (start_ < wb_offset + wb_len) :
244                         (wb_offset + wb_len <= start_)); // Wraparound
245         ZX_ASSERT_MSG((status = zx_vmo_read(vmo, ptr, vmo_offset * kMinfsBlockSize,
246                       wb_len * kMinfsBlockSize)) == ZX_OK, "VMO Read Fail: %d", status);
247         len_ += wb_len;
248 
249         // Update the WriteRequest to transfer from the writeback buffer
250         // out to disk, rather than the supplied VMO
251         reqs[i].vmo_offset = wb_offset;
252         reqs[i].length = wb_len;
253 
254         if (wb_len != vmo_len) {
255             // We wrapped around; write what remains from this request
256             vmo_offset += wb_len;
257             dev_offset += wb_len;
258             wb_len = vmo_len - wb_len;
259             ptr = mapper_.start();
260             ZX_DEBUG_ASSERT((start_ == 0) ?  (start_ < wb_len) : (wb_len <= start_)); // Wraparound
261             ZX_ASSERT(zx_vmo_read(vmo, ptr, vmo_offset * kMinfsBlockSize,
262                                   wb_len * kMinfsBlockSize) == ZX_OK);
263             len_ += wb_len;
264 
265             // Shift down all following write requests
266             static_assert(fbl::is_pod<WriteRequest>::value, "Can't memmove non-POD");
267 
268             // Insert the "new" request, which is the latter half of
269             // the request we wrote out earlier
270             WriteRequest request;
271             request.vmo = reqs[i].vmo;
272             request.vmo_offset = 0;
273             request.dev_offset = dev_offset;
274             request.length = wb_len;
275             i++;
276             reqs.insert(i, request);
277         }
278     }
279 }
280 
Enqueue(fbl::unique_ptr<WritebackWork> work)281 void WritebackBuffer::Enqueue(fbl::unique_ptr<WritebackWork> work) {
282     TRACE_DURATION("minfs", "WritebackBuffer::Enqueue");
283     TRACE_FLOW_BEGIN("minfs", "writeback", reinterpret_cast<trace_flow_id_t>(work.get()));
284     fbl::AutoLock lock(&writeback_lock_);
285 
286     {
287         TRACE_DURATION("minfs", "Allocating Writeback space");
288         size_t blocks = work->BlkCount();
289         // TODO(smklein): Experimentally, all filesystem operations cause between
290         // 0 and 10 blocks to be updated, though the writeback buffer has space
291         // for thousands of blocks.
292         //
293         // Hypothetically, an operation (most likely, an enormous write) could
294         // cause a single operation to exceed the size of the writeback buffer,
295         // but this is currently impossible as our writes are broken into 8KB
296         // chunks.
297         //
298         // Regardless, there should either (1) exist a fallback mechanism for these
299         // extremely large operations, or (2) the worst-case operation should be
300         // calculated, and it should be proven that it will always fit within
301         // the allocated writeback buffer.
302         ZX_ASSERT_MSG(EnsureSpaceLocked(blocks) == ZX_OK,
303                       "Requested txn (%zu blocks) larger than writeback buffer", blocks);
304     }
305 
306     {
307         TRACE_DURATION("minfs", "Copying to Writeback buffer");
308         CopyToBufferLocked(work.get());
309     }
310 
311     work_queue_.push(std::move(work));
312     cnd_signal(&consumer_cvar_);
313 }
314 
WritebackThread(void * arg)315 int WritebackBuffer::WritebackThread(void* arg) {
316     WritebackBuffer* b = reinterpret_cast<WritebackBuffer*>(arg);
317 
318     b->writeback_lock_.Acquire();
319     while (true) {
320         while (!b->work_queue_.is_empty()) {
321             auto work = b->work_queue_.pop();
322             TRACE_DURATION("minfs", "WritebackBuffer::WritebackThread");
323 
324             // Stay unlocked while processing a unit of work
325             b->writeback_lock_.Release();
326 
327             // TODO(smklein): We could add additional validation that the blocks
328             // in "work" are contiguous and in the range of [start_, len_) (including
329             // wraparound).
330             size_t blks_consumed = work->Complete(b->mapper_.vmo().get(), b->buffer_vmoid_);
331             TRACE_FLOW_END("minfs", "writeback", reinterpret_cast<trace_flow_id_t>(work.get()));
332             work = nullptr;
333 
334             // Relock before checking the state of the queue
335             b->writeback_lock_.Acquire();
336             b->start_ = (b->start_ + blks_consumed) % b->cap_;
337             b->len_ -= blks_consumed;
338             cnd_signal(&b->producer_cvar_);
339         }
340 
341         // Before waiting, we should check if we're unmounting.
342         if (b->unmounting_) {
343             b->writeback_lock_.Release();
344             return 0;
345         }
346         cnd_wait(&b->consumer_cvar_, b->writeback_lock_.GetInternal());
347     }
348 }
349 
350 #endif  // __Fuchsia__
351 
352 } // namespace minfs
353